Adding real-time communication between clients with SignalR – Step 1/2

Step 1/2: Server-side development

If you’ve followed the previous articles in the series you have a good foundation for a client application with draw tools and tactical symbols. We will now expand this to support a scenario where several clients can collaborate by sending feature updates in real-time over a WebSocket connection using the popular library SignalR.

This is a two-part article where we begin by writing the streaming server and finish in the second part by connecting the WPF client to the server.

This archive contains the solution outlined in this and the next article.

Requirements

  • Completed previous tutorials.

Overview

To have multiple clients share information we need to add a new component that will be responsible for receiving and sending information between them. To achieve this we will create a small server application that will mediate between the clients. Each client that will participate in the information sharing will have to connect to the server and synchronize to receive the latest information and then subscribe to the relevant streams to start receiving information from the other clients. The scenario could be visualized with the image below:

On the client side, we will implement a background service that will continuously be listening to the different streams – insert, update and delete, and also send local updates to the server. Any processing such as serialization or deserialization will happen in the service without blocking the UI.

The server, on the other hand, will act as the source of truth, managing the shared state of the connected clients. It will consist of a thread-safe lookup mechanism with observables that will signal the insert, update and delete events that will be exposed to the clients.

Creating the streaming server

Start by adding a new project to the existing solution by right-clicking the solution in Visual Studio and clicking Add > New Project… Then find the template called ASP.NET Core Empty, select that and click Next. Name it StreamingService and click Next again. In the next dialog, un-check the Enable Docker option for now and then click Create. This will set us up with a very bare-bones web application that we can extend.

Next, add the following NuGet packages to the project:

  • System.Reactive
  • Carmenta.Engine
  • Microsoft.AspNetCore.SignalR.Protocols.MessagePack

Creating the streaming repository

We will create a simple class for storing our features, backed by a thread-safe dictionary for fast lookup. It will also expose IObservable properties which will be used to push updates to consumers. Add the following code to a class called StreamingRepository in the project root:

StreamingRepository.cs

using System.Collections.Concurrent;
using System.Reactive.Subjects;

namespace StreamingService;

public class StreamingRepository<TKey, TValue> where TKey : notnull
{
    private readonly ConcurrentDictionary<TKey, TValue> _store = new();

    private readonly Subject<(TKey key, TValue value)> _addedSubject = new();
    private readonly Subject<(TKey key, TValue value)> _updatedSubject = new();
    private readonly Subject<TKey> _removedSubject = new();

    public IObservable<(TKey key, TValue value)> AddedStream => _addedSubject;
    public IObservable<(TKey key, TValue value)> UpdateStream => _updatedSubject;
    public IObservable<TKey> RemovedStream => _removedSubject;

    public void Add(TKey key, TValue value)
    {
        _store[key] = value;
        _addedSubject.OnNext((key, value));
    }

    public void Update(TKey key, TValue value)
    {
        if (!_store.ContainsKey(key))
            return;

        _store[key] = value;
        _updatedSubject.OnNext((key, value));
    }

    public void Remove(TKey key)
    {
        if (_store.TryRemove(key, out _))
            _removedSubject.OnNext(key);
    }

    public IEnumerable<(TKey, TValue value)> Objects => _store.Select(x => (x.Key, x.Value));
}

The generic StreamingRepository can hold any key-value pair combinations and upon adding, updating or removing entries, the key-value pair will be added to their respective observable streams by calling the OnNext methods on the Subject fields. The _addedSubject and _updatedSubject fields expose observable streams with the (TKey, TValue) signature since each client connected to the server needs to receive the new values to update the local state accordingly. Removing values is a bit simpler since we just have to remove the value corresponding to the key on the RemovedStream and we can reduce the data transfer by not sending the value over the wire to the clients.

Creating the SignalR hub

With the StreamingRepository in place, let’s create the client-facing part of the service by adding a SignalR Hub implementation to the root of the StreamingService project:

CommunicationHub.cs

using Microsoft.AspNetCore.SignalR;
using StreamingService.Utils;
using System.Threading.Channels;

using SerializedFeature = byte[];

namespace StreamingService;

public class CommunicationHub(StreamingRepository<Guid, SerializedFeature> repository) : Hub
{
    public void UpdateFeature(Guid id, SerializedFeature value)
        => repository.Update(id, value);

    public void AddFeature(Guid id, SerializedFeature value)
        => repository.Add(id, value);

    public void RemoveFeature(Guid id) =>
        repository.Remove(id);

    public ChannelReader<(Guid Id, SerializedFeature Buffer)> AddedStream()
        => repository.AddedStream.AsChannelReader();

    public ChannelReader<(Guid, SerializedFeature)> UpdateStream()
        => repository.UpdateStream.AsChannelReader();

    public ChannelReader<Guid> RemoveStream()
        => repository.RemovedStream.AsChannelReader();

    public List<(Guid id, SerializedFeature Buffer)> AllFeatures() =>
        repository.Objects.ToList();
}

By inheriting from the Microsoft.AspNetCore.SignalR.Hub base class we will expose the public methods to the outside world so that other processes can call them using different protocols (WebSockets by default), and also subscribe to event streams exposed by the methods with the generic ChannelReader return type. We see that the constructor takes a parameter with the type StreamingRepository<Guid, byte[]> corresponding to the repository where our features will be stored. The Hub essentially wraps the repository methods and exposes them to remote clients.

Note how we’ve included the type alias using SerializedFeature = byte[]; to clarify the intended value of the key-value-pairs in the repository.

Connecting the Hub to the Repository

To convert the repository’s IObservable properties to ChannelReaders for the Hub to expose, add a new static class:

Utils\Extensions.cs

using System.Threading.Channels;

namespace StreamingService.Utils;

public static class Extensions
{
    public static ChannelReader<T> AsChannelReader<T>(this IObservable<T> observable)
    {
        var channel = Channel.CreateUnbounded<T>();

        IDisposable disposable = observable.Subscribe(
            value => channel.Writer.TryWrite(value),
            error => channel.Writer.TryComplete(error),
            () => channel.Writer.TryComplete());

        channel.Reader.Completion.ContinueWith(_ => disposable.Dispose());
        return channel.Reader;
    }
}

The AsChannelReader extension method will convert the IObservable stream into a ChannelReader of the same type by creating an unbounded (no length limit) Channel read-write pair and returning the reader part while any added values will be added to the channel’s writer part by calling channel.Writer.TryWrite in the Subscribe method 1. Exposing a ChannelReader in a hub is one way of achieving long-lived streams from a server application to one or several clients.

Adding application dependencies

To make everything come together we need to add the necessary dependencies and set up our CommunicationHub by mapping it to a public endpoint. Let’s start by adding our services to Program.cs. The file should contain the following:

Program.cs

using StreamingService;

WebApplicationBuilder builder = WebApplication.CreateBuilder(args);

builder.Services.AddSignalR().AddMessagePackProtocol();
builder.Services.AddSingleton<StreamingRepository<Guid, byte[]>>();

WebApplication app = builder.Build();

app.UseHttpsRedirection();

app.MapHub<CommunicationHub>("/communicationHub");
app.Run();

This will register the necessary dependencies for SignalR and configure SignalR to support binary serialization using MessagePack. We also add the streaming repository with a mapping from Guid to byte[], where the Guid will represent a globally unique identifier for each feature, and the byte array represents the serialized feature itself. Finally, before the call to app.Run(), we expose the CommunicationHub to the public endpoint /communicationHub. And with that, the server component is finished.

Conclusion

In this first part of the final article in our WPF plus MVVM tutorial series, we have coded a server for streaming and synchronizing arbitrary GUID-bytearray pairs between multiple clients. The server leverages SignalR to manage connections and pushing changes between clients.

The solution to this tutorial also includes the launch settings needed to run the streaming server as a Docker container directly from Visual Studio, so have a look if you’re interested in learning more about that.

Continue with the next article in the series to see how we will implement the client-side support and connect to the server.

Footnotes

  1. Channels, observables and SignalR are quite extensive topics on their own so we highly recommend reading the official documentation for these topics for further details:
    Channels
    SignalR
    Reactive Extensions, Observable ↩︎