Joshua Steward
Concurrent Flows

Concurrent Flows

Background Message Processing in Dotnet 6

Background Message Processing in Dotnet 6

Joshua Steward's photo
Joshua Steward
·May 31, 2021·

7 min read

Background

Message processing, handling, publishing and receiving, is probably one of the most important topics of fundamental software architecture. With today's distributed and decoupled systems robust messaging handling is a must. Whether your communicating over Azure Servicebus, Kafka, SignalR, Azure Pub/Sub or another broker you'll need a way to get information out of your application and on down to subscribers. In this post we'll have, as a background, an AspNet Api running on Dotnet 6 Preview 4 and I'll work through a pattern to get messages out of anywhere in your application out to consumers.

Challenge

We have a multitude of message producers in any given application. Messages could come from Controllers, Hubs, Services, Repositories, Handlers etc. All of these abstractions have the ability to produce messages that may influence or direct consumers further downstream. We want our message handling not to obfuscate what these abstractions were originally intended to do. That is to say; We want our message handling to be as lean as possible when it comes to getting a message out. We don't want our message producers to worry about how the message gets routed, published, retried or logged.

Solution

We'll develop a simple solution to pass messages from our producers to all our consumers using an intra process handler that communicates with a HostedService and publishes to all our consumers via a set of Publishers. The solution presented here was inspired by colleague of mine whose implementation was based on rx.net. However, we're going to take things a little further, support multiple publishers and base our implementation on Channels instead of Observables.

The Message

We'll use just a simple toy message to demonstrate this pattern. One simple record type. If you haven't seen the new record types available in C# 9 check out this Record Types Intro

public record EventMessage(string payload);

The Messenger

First we introduce a light weight abstraction that will encapsulate sending an intra process message.

public interface IMessengerWriter<TMessage> where TMessage : class
{
    ValueTask WriteAsync(TMessage message);
}

Next we define the reader half of our messenger.

public interface IMessengerReader<TMessage>
    where TMessage : class
{
    Task Shutdown();
    ValueTask<bool> WaitToReadAsync(CancellationToken token);
    IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token);
}

The Writer interface is all that will be injected into our message producers, the Controllers, Services or Handlers etc., that produce messages. Notice we provide the bare minimum that concerns our message producers, all they should care about is that the message gets written somewhere and then they can continue on with their intended function. We use ValueTask here as a Task is not necessary, we simply need something to await that indicates the message was written successfully. The Reader interface is what will be injected into our consuming process and exposes only the methods of interest to the reader. But once a message is written, where does it go? First it goes through the intra process message handler. The implementation of this handler is based off of the System.Threding.Channels package that allows a process to communicate through reader/writer pairs of channels. If you haven't used these beautiful abstractions before head over to the Channels Intro.

public class Messenger<TMessage>
    : IMessengerReader<TMessage>, IMessengerWriter<TMessage>
    where TMessage : class
{
    private readonly ChannelWriter<TMessage> writer;
    private readonly ChannelReader<TMessage> reader;

    public Messenger()
    {
        var channel = Channel.CreateUnbounded<TMessage>();
        writer = channel.Writer;
        reader = channel.Reader;
    }

    public ValueTask<bool> WaitToReadAsync(CancellationToken token)
        => reader.WaitToReadAsync(token);

    public IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token)
        => reader.ReadAllAsync(token);

    public ValueTask WriteAsync(TMessage message)
        => writer.WriteAsync(message);

    public Task Shutdown()
    {
        writer.Complete();
        return reader.Completion;
    }
}

As you can see this simple Messenger just wraps a reader/writer pair and exposes the necessary methods that we're interested in. We create an unbound channel to accept all incoming messages and expect them to be promptly consumed upon writing. This light weight abstraction allows us to inject only the functionality our message producers are concerned with. One advantage here is that nowhere along the line will some producer suddenly complete the writing stream.

The Publishers

In this post we show two publishers: a publisher for Azure Servicebus and one for SignalR. There could be any number of places we want to publish our message and all we have to do implement a simple interface and plug it into the rest of our infrastructure. The interface for a publisher is defined as:

public interface IPublisher<TMessage> where TMessage : class
{
    Task PublishAsync(TMessage message);
}

We can use this interface to publish to Azure ServiceBus. We won't go into details of Azure Servicebus but you can find more information in this getting started guide.

public class ServicebusPublisher<TMessage> : IPublisher<TMessage> where TMessage : class
{
    private readonly ISenderClient senderClient;

    public ServicebusPublisher(ISenderClient senderClient)
        => this.senderClient = senderClient ?? throw new ArgumentNullException(nameof(senderClient));

    public Task PublishAsync(TMessage message)
        => senderClient.SendAsync(ToMessage(message));

    private static Message ToMessage(TMessage message)
        => new Message
        {
            MessageId = Guid.NewGuid().ToString(),
            Body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message))
        };
}

We can implement the same interface to publish a message to the clients of a SignalR Hub. We won't go into details of SignalR but you can see this quickstart guide for more information.

public class SampleHubPublisher : IPublisher<EventMessage>
{
    private readonly IHubContext<SampleHub, ISampleHubClient> hubContext;

    public SampleHubPublisher(IHubContext<SampleHub, ISampleHubClient> hubContext)
        => this.hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext));

    public Task PublishAsync(EventMessage message)
        => hubContext.Clients.All.ClientEvent(message);
}

The Background Messenger

Next we bring together our IMessenger and our IPublishers into a background service that choregraphs the publishing of our message. Our BackgroundMessenger takes our intra process IMessenger and all associated IPublishers and responds to any incoming messages by sending them to each registered publisher.

public class BackgroundMessenger<TMessage>
    : BackgroundService
    where TMessage : class
{
    private readonly ILogger<BackgroundMessenger<TMessage>> logger;
    private readonly IMessengerReader<TMessage> messenger;
    private readonly IEnumerable<IPublisher<TMessage>> publishers;

    public BackgroundMessenger(
        ILogger<BackgroundMessenger<TMessage>> logger,
        IMessengerReader<TMessage> messenger,
        IEnumerable<IPublisher<TMessage>> publishers)
    {
        this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.messenger = messenger ?? throw new ArgumentNullException(nameof(messenger));
        this.publishers = publishers ?? throw new ArgumentNullException(nameof(publishers));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await ReadAndPublish(stoppingToken);
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {
            await messenger.Shutdown();
        }
    }

    private async Task ReadAndPublish(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            stoppingToken.ThrowIfCancellationRequested();
            if (await messenger.WaitToReadAsync(stoppingToken))
            {
                stoppingToken.ThrowIfCancellationRequested();
                await foreach (var message in messenger.ReadAllAsync(stoppingToken))
                {
                    stoppingToken.ThrowIfCancellationRequested();
                    await Task.WhenAll(publishers.Select(publisher => TryPublish(publisher, message)));
                }
            }
        }
    }

    private async Task TryPublish(IPublisher<TMessage> publisher, TMessage message)
    {
        try
        {
            await publisher.PublishAsync(message);
            logger.LogInformation($"Published to {publisher.GetType().Name} with message {JsonSerializer.Serialize(message)}");
        }
        catch (Exception ex)
        {
            logger.LogError(ex, $"Error occurred while publishing to {publisher.GetType().Name} with message {JsonSerializer.Serialize(message)}");
        }
    }
}

Our BackgroundMessenger is a HostedService that listens for incoming messages on the Messenger channel and publishes all messages to registered IPublishers. For this example we simply log any errors and continue but we could easily add retries and other features.

Bringing It All Together

Finally we wire everything up in our ConfigureServices method using a custom registration extension that will wire everything up for us.

public static void AddMessenger<TMessage>(
    this IServiceCollection services,
    IEnumerable<Type> publishers = null,
    IEnumerable<IPublisher<TMessage>> instances = null,
    IEnumerable<Func<IServiceProvider, IPublisher<TMessage>>> factories = null)
    where TMessage : class
{
    if ((publishers is null || !publishers.Any() || !publishers.All(p => p.GetInterfaces().Contains(typeof(IPublisher<TMessage>)))) &&
        (instances is null || !instances.Any()) &&
        (factories is null || !factories.Any()))
        throw new ArgumentException($"Must register at least one publisher for {typeof(TMessage).Name}");

    publishers ??= Enumerable.Empty<Type>();
    instances ??= Enumerable.Empty<IPublisher<TMessage>>();
    factories ??= Enumerable.Empty<Func<IServiceProvider, IPublisher<TMessage>>>();

    foreach (var publisher in publishers)
        services.AddSingleton(typeof(IPublisher<TMessage>), publisher);
    foreach (var publisher in instances)
        services.AddSingleton(publisher);
    foreach (var factory in factories)
        services.AddSingleton(factory);
    services.AddSingleton<Messenger<TMessage>>();
    services.AddSingleton<IMessengerWriter<TMessage>>(sp => sp.GetRequiredService<Messenger<TMessage>>());
    services.AddSingleton<IMessengerReader<TMessage>>(sp => sp.GetRequiredService<Messenger<TMessage>>());
    services.AddHostedService<BackgroundMessenger<TMessage>>();
}

This registration method registers all Publishers, the Messenger and the HostedService for a specific message type. We can provide either implementation types, fully formed Publishers or factory implementations of our Publishers.

Conclusion

In this post we have built some basic infrastructure to easily publish messages out to any number of consumers across any number of messaging infrastructure types. We've shown just two possible Publishers but any implementation of the IPublisher<TMessage> interface is possible. Our BackgroundMessenger now can handle messages from any of our application's producers and we've kept all the messaging dirty work out of producers and kept them lean. You can find all the code for this post on my GitHub Project ConcurrentFlows.HashNode

 
Share this