Joshua Steward
Concurrent Flows

Concurrent Flows

Message Routing in Dotnet 6

Message Routing in Dotnet 6

Joshua Steward's photo
Joshua Steward
·Jun 1, 2021·

7 min read

Background

This post builds on the work developed in this post; Background Message Processing in Dotnet 6. We extend the Messenger<T> and BackgroundMessenger<T> concepts into a new MessageRouter that takes a single InternalMessage and transforms and routes it to the corresponding BackgroundMessenger that publishes the specified message to its intended audience.

Challenge

Let's say we have a basic CRUD api consisting of Commands Queries and Handlers. When ever we Create, Update or Delete an entity in our domain we want a corresponding message to be sent but we recognize that the Handlers of these Commands shouldn't be concerned with the messaging aspects of the event. Our producers, the Handlers, should only have to be concerned with their intended operation, Create, Update or Delete and with the contextual knowledge each of those operations has they should be able to emit a generic event with a payload and a type. We can let another system worry about where the individual type of message gets formed and routed and keep our producers free of all but the simplest declarative logic to create an InternalMessage assigned the correct type.

Solution

We'll build out a pattern for solving this challenge based on several interrelated components.

The Internal Message

The first component is the InternalMessage, this message captures the data and the context the event handler already has and submits it to our internal messaging infrastructure. We define an abstract InternalMessage record that takes a type enum and a payload.

    public abstract record InternalMessage<TEnum, TPayload>(TEnum Type, TPayload Payload)
        where TEnum : Enum
        where TPayload : class;

Our InternalMessage is comprised of an enum defining our message types and the payload representing our entity that's been acted upon. For our example we define a SampleEntity and its related InternalMessage and Types.

public record SampleEntity(int Id, string Name);

public record SampleHubInternalMessage(SampleHubMessageType Type, SampleEntity Payload)
    : InternalMessage<SampleHubMessageType, SampleEntity>(Type, Payload);

public enum SampleHubMessageType
{
    Created = 1,
    Updated = 2,
    Deleted = 3
}

The Messages We'll Produce

For our CRUD example we'll use an Azure SignalR Hub as our messaging platform but any other medium would work. We define a HubClient with strongly typed events for each of our operations.

public interface ISampleHubClient
{
    Task EntityCreated(EntityCreatedMessage message);
    Task EntityUpdated(EntityUpdatedMessage message);
    Task EntityDeleted(EntityDeletedMessage message);
}

For each of our HubClient methods we define a strongly typed message representing the event that took place.

public record EntityCreatedMessage(SampleEntity Entity, string Metadata);

public record EntityUpdatedMessage(SampleEntity Entity, string Metadata);

public record EntityDeletedMessage(int Id);

Note how we only return the Id for the EntityDeleted event since in this example the whole deleted entity is not necessary, nor is the any Metadata that might be associated with entity.

The Message Factory

Next we have the MessageFactory. The MessageFactory is what produces our resultant messages for a given type. We define the MessageFactory with a simple interface.

public interface IMessageFactory<TEnum, TPayload, TInternalMessage>    
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }
}

With this interface we expose a map between our message type enum and a function that takes our InternalMessage and produces an asynchronous stream of messages. But why define the result as an asynchronous stream? This choice gives some benefits but in summary it's an asynchronous covariant collection of our resultant messages. This provides us with:

  • Async: Allows us to produce our messages asynchronously in the case we need some external resource to fully form the resultant messages.
  • Collection: Allows us to produce multiple output messages for a single InternalMessage.
  • Covariant: Allows us to assign our more derived message type to the underlying object collection.

So, our example implementation of a MessageFactory for our SampleHub looks like this.

public class SampleHubMessageFactory : IMessageFactory<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>
{
    private readonly IMetadataRepository metadataRepository;

    public SampleHubMessageFactory(IMetadataRepository metadataRepository)
    {
        this.metadataRepository = metadataRepository ?? throw new ArgumentNullException(nameof(metadataRepository));

        MessageFactoryMap = new Dictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>>()
        {
            { SampleHubMessageType.Created, msg => GetCreatedMessage(msg) },
            { SampleHubMessageType.Updated, msg => GetUpdatedMessage(msg) },
            { SampleHubMessageType.Deleted, msg => GetDeletedMessage(msg) }
        }.ToImmutableDictionary();
    }

    public ImmutableDictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }

    public async IAsyncEnumerable<EntityCreatedMessage> GetCreatedMessage(SampleHubInternalMessage internalMessage)
    {
        var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
        yield return new EntityCreatedMessage(internalMessage.Payload, metadata);
    }

    public async IAsyncEnumerable<EntityUpdatedMessage> GetUpdatedMessage(SampleHubInternalMessage internalMessage)
    {
        var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
        yield return new EntityUpdatedMessage(internalMessage.Payload, metadata);
    }

    public IAsyncEnumerable<EntityDeletedMessage> GetDeletedMessage(SampleHubInternalMessage internalMessage)
        => new[] { new EntityDeletedMessage(internalMessage.Payload.Id) }.ToAsyncEnumerable();
}

In this example we showcase the ability to inject dependencies, that would have otherwise been injected into our message producers, directly into our MessageFactory. Remember that the goal of all this is to keep our message producers, the Handlers, Services, Controllers etc. free of anything but the most fundamental knowledge of messaging. They simply package up the data they have, given the context of a message Type and send it on it's way. In this example we use our MessageFactory to gather some extra metadata about the entity before packaging it up and returning to our MessageRouter.

The Message Router

The MessageRouter reads from an IMessengerReader<TInternalMessage> and invokes the corresponding MessageFactory method before writing the result to a IMessengerWriter<TMessage>.

public class MessageRouter<TEnum, TPayload, TInternalMessage>
    : BackgroundService
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    private readonly ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger;
    private readonly IMessengerReader<TInternalMessage> messenger;
    private readonly ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> messageFactoryMap;
    private readonly IServiceProvider serviceProvider;

    private ConcurrentDictionary<Type, dynamic> writerCache = new ConcurrentDictionary<Type, dynamic>();

    public MessageRouter(
        ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger,
        IMessengerReader<TInternalMessage> messenger,
        IMessageFactory<TEnum, TPayload, TInternalMessage> messageFactory,
        IServiceProvider serviceProvider)
    {
        this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.messenger = messenger ?? throw new ArgumentNullException(nameof(messenger));
        messageFactoryMap = messageFactory?.MessageFactoryMap ?? throw new ArgumentNullException(nameof(messageFactoryMap));
        this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await MultiplexInternalMessagesAsync(stoppingToken);
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {
            await messenger.Shutdown();
        }
    }

    private async ValueTask MultiplexInternalMessagesAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            stoppingToken.ThrowIfCancellationRequested();
            if (await messenger.WaitToReadAsync(stoppingToken))
            {
                stoppingToken.ThrowIfCancellationRequested();
                await foreach (var internalMessage in messenger.ReadAllAsync(stoppingToken))
                {
                    stoppingToken.ThrowIfCancellationRequested();
                    await GenerateMessagesAndWriteAsync(internalMessage);
                }
            }
        }
    }

    private async ValueTask GenerateMessagesAndWriteAsync(TInternalMessage internalMessage)
    {
        if (messageFactoryMap.ContainsKey(internalMessage.Type))
        {
            var externalMessages = messageFactoryMap[internalMessage.Type](internalMessage);
            await foreach (var externalMessage in externalMessages)
            {
                var writerType = typeof(IMessengerWriter<>).MakeGenericType(externalMessage.GetType());
                if (!writerCache.TryGetValue(writerType, out dynamic writer))
                {
                    writer = serviceProvider.GetRequiredService(writerType);
                    writerCache.TryAdd(writerType, writer);
                }
                writer.WriteAsync((dynamic)externalMessage);
                logger.LogInformation($"Sent {JsonSerializer.Serialize(externalMessage)} to {typeof(IMessengerWriter<>).Name}<{writerType.GenericTypeArguments[0].Name}>");
            }
        }
    }
}

The MessageRouter listens for any InternalMessages as defined by it's type parameters through an IMessengerReader<TInternalMessage> interface. Then, using the injected MessageFactory transforms the InternalMessage into a message meant for consumers. Finally, a registered IMessengerWriter<TMessage> is procured either from the cache of instances already generated or from the ServiceProvider and the message is dispatched to all registered IPublisher<TMessage> through a BackgroundMessenger as discussed in this previous post.

Wiring it all up

To wire this all up, first, we register a Messenger for each of our external message types, using the extension method provided in the previous post

services.AddMessenger<EntityCreatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityUpdatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityDeletedMessage>(new[] { typeof(SampleHubPublisher) });

Then we add our MessageRouter and provide our MessageFactory

services.AddMessageRouter<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>(typeof(SampleHubMessageFactory));

AddMessageRouter is another extensions method defined to wire up all the necessary components of our infrastructure.

public static void AddMessageRouter<TEnum, TPayload, TInternalMessage>(this IServiceCollection services,
    Type messageFactory = null,
    Func<IServiceProvider, IMessageFactory<TEnum, TPayload, TInternalMessage>> factoryFactory = null)
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    if (messageFactory is null && factoryFactory is null)
        throw new ArgumentException($"Must provide a {nameof(messageFactory)}.");
    if (messageFactory is not null && factoryFactory is not null)
        throw new ArgumentException($"Must only provide one {nameof(messageFactory)}.");
    if ((messageFactory is not null) &&
        !messageFactory.GetInterfaces().Contains(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>)))
        throw new ArgumentException($"{nameof(messageFactory)} must of type {typeof(IMessageFactory<,,>).Name}<{typeof(TEnum).Name},{typeof(TPayload).Name},{typeof(TInternalMessage).Name}>");

    if (messageFactory is not null)
        services.AddSingleton(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>), messageFactory);
    else
        services.AddSingleton(factoryFactory);

    services.AddSingleton<Messenger<TInternalMessage>>();
    services.AddSingleton<IMessengerWriter<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
    services.AddSingleton<IMessengerReader<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
    services.AddHostedService<MessageRouter<TEnum, TPayload, TInternalMessage>>();
}

This extension method allows us to either: provide a type for our MessageFactory or a factory function for the MessageFactory. Then registers all the infrastructure we need for our MessageRouter

Conclusion

Now we can see the full life-cycle of an event. From a user interaction that, say, Updated an entity to the InternalMessage that captured the data and context associated with that event, to the Factory and Router that published a fully formed message intended for consumers. Instead of tangling messaging concerns with our Command Handlers we have moved those responsibilities to dedicated components and kept our Handlers free to execute only those tasks for which they are responsible. You can find all the code for this post on my GitHub Project ConcurrentFlows.HashNode

 
Share this