Joshua Steward
Concurrent Flows

Concurrent Flows

A Simple Hosted Actor System

A Simple Hosted Actor System

Turning a Web of Handlers into a Mini Actor System

Joshua Steward's photo
Joshua Steward
·Jun 20, 2021·

9 min read

Overview

Take an AspNet Core Api, for example, we have chosen for this; a pattern of Queries/Commands and Handlers. We're going to explore this pattern and see what we can add to the body of knowledge. When implementing the Command/Query/Handler pattern we often end up with many small classes and specialized handlers. This is great for testing and can lead to excellent code organization and separation of concerns. But when integrating with Controllers we find the many handlers need to be injected and we tend towards the mediator pattern. One great implementation of this is MediatR, or you can go a similar route taken by one of the great maintainers of SimpleInjector, who proposes a simple query dispatcher in the great post Meanwhile... on the query side of my architecture. While that post is dated all the way back to 2011, it's still a fantastic reference for what I refer to as Application Scale CQRS.

CQRS At Different Levels

The Command Query Responsibility Segregation principles can be applied at various levels of your architecture. At the Systems Level we see Event Sourcing and a separation of read/write data models. But at the Application Scale we see a breakdown of classes. From monolithic Services and Repositories to single purpose Handlers. This provides a great separation of concerns and extensibility while keeping code cohesive within a single domain.

The Dispatch Issue

One common problem we find in the Mediator Pattern is the need to dynamically invoke un-instantiated handlers at runtime. Even with the super fast SimpleInjector this can take time especially if we're invoking a complex web of handlers to accomplish a given task. So the question becomes how can we retain the benefits of Application Scale CQRS while avoiding the runtime costs of instantiation of handlers.

The Actor Pattern

The Actor Pattern represents a shift in thinking. Instead of request scoped handlers I propose a pool of interconnected communicating handlers. While Matt Ferderer defines the properties of an actor to include:

  • Store Data
  • Receive messages from other actors
  • Pass messages to other actors
  • Create additional child actors

I'm going to take a different angle.

Our Goals: A simple hosted actor system

I'm going to redefine an Actor a little a differently for our api specific use case, an Actor should:

  • Live outside the request stream
  • Be able to receive messages asynchronously
  • Return a result asynchronously
  • Communicate with other actors in the system without instantiating them
  • Finally, be decoupled from the calling code

In order to achieve these goals we're going to combine elements of the Mediator Pattern and the Actor Pattern. And just for examples sake we're going to focus on the Query side of CQRS, the Command side is much simpler and maybe the topic of a future post.

Our Solution

The Basic Infrastrucuture

First we start with the basic infrastructure that will make up our system. You surely have seen many iterations of IQuery, IQueryHandler , etc. However, we're going to change them slightly to fit with our Actor approach. For starters we define an ActorQuery as such:

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
    public record ActorQuery<TPayload, TAnswer>(TPayload Payload);
}

Note in this record we define the payload that we will submit to the actor system and we define the type of answer we expect back. We intentionally use the term Answer to distinguish it form a typical Task Result. This aligns more so with the concept of; Every query has an answer. Next we define our base hosted QueryActor this actor will be resoponsible for reciveing the input payload and emitting the answer to the query

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
    public abstract class QueryActor<TQuery>
        : BackgroundService
    {
        protected readonly ChannelReader<KeyValuePair<Guid, TQuery>> queryReader;
        protected readonly ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter;

        public QueryActor(
            ChannelReader<KeyValuePair<Guid, TQuery>> queryReader, 
            ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
        {
            this.queryReader = queryReader ?? throw new ArgumentNullException(nameof(queryReader));
            this.answerWriter = answerWriter ?? throw new ArgumentNullException(nameof(answerWriter));
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested && !queryReader.Completion.IsCompleted)
            {
                var messageReady = await queryReader.WaitToReadAsync(stoppingToken);
                if (messageReady)
                {
                    await foreach (var query in queryReader.ReadAllAsync(stoppingToken))
                    {
                        await HandleAsync(query, stoppingToken);
                    }
                }
            }
        }

        public abstract Task HandleAsync(KeyValuePair<Guid, TQuery> query, CancellationToken stoppingToken);
    }
}

Our base QueryActor takes a ChannelReader that accepts the input query keyed with a unique identifier. This unique identifier will be used to connect Answers with Queries. We also make use of the dynamic type and an unconstrained query type parameter to allow this base Actor to handle any query and any answer. This approach is shown now in the what connects our Actor System together: the AnswerStream.

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
    public interface IAnswerStream : IHostedService
    {
        ValueTask<dynamic> SubmitQuery<TQuery>(TQuery query);
    }
}

First we've defined a simple interface to interact with our Actor System. This interface allows us to submit a query and await a result from Actor System. Notice also that we e're extending the IHostedService interface to allow this stream to live outside the request scope and serve it's primary purpose of matching Answers to Queries

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
    public class AnswerStream : BackgroundService, IAnswerStream
    {
        private readonly IServiceProvider serviceProvider;
        private readonly ChannelReader<KeyValuePair<Guid, dynamic>> answerReader;

        public AnswerStream(
            IServiceProvider serviceProvider, 
            ChannelReader<KeyValuePair<Guid, dynamic>> answerReader)
        {
            this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
            this.answerReader = answerReader ?? throw new ArgumentNullException(nameof(answerReader));
            QueryResults = new ConcurrentDictionary<Guid, TaskCompletionSource<dynamic>>();
        }

        private ConcurrentDictionary<Guid, TaskCompletionSource<dynamic>> QueryResults { get; }

        public async ValueTask<dynamic> SubmitQuery<TQuery>(TQuery query)
        {
            var writer = serviceProvider.GetRequiredService<ChannelWriter<KeyValuePair<Guid, TQuery>>>();
            var queryId = Guid.NewGuid();
            var resultSource = new TaskCompletionSource<dynamic>();
            QueryResults.TryAdd(queryId, resultSource);
            await writer.WriteAsync(new KeyValuePair<Guid, TQuery>(queryId, query));
            return await resultSource.Task;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested && !answerReader.Completion.IsCompleted)
            {
                var resultsReady = await answerReader.WaitToReadAsync(stoppingToken);
                if (resultsReady)
                    await foreach (var result in answerReader.ReadAllAsync(stoppingToken))
                        if (QueryResults.TryRemove(result.Key, out var resultSource))
                            resultSource.TrySetResult(result.Value);
            }
        }
    }
}

We see in this implementation the only use of runtime resolution is the instantiation of a singleton ChannelWriter to input the supplied query into our Actor System. This is a low cost operation since the ChannelWriter will be held and made available as a singleton in our ServiceProvider. We see also, that when a query is submitted it is asigned an unquie identifer that will later be used to match it up with its results, it is also associated with a TaskCompletionSource<TAnswer> and this associated Task is returned to the caller to await the results from our AnswerStream. The ExecuteAsync method continuously waits for results to match up with assigned queries and manages the ConcurrentDictionary that stores the associated TaskCompletionSources. And again, we use dynamic for the answer type to allow this AnswerStream to handle any query and any Answer. More could be done here to better manage the ConcurrentDictionary but this is just basic working sample.

The Last piece of infrastructure we need is way to register our actor system with the ServiceProvider. To accomplish that we create an extension method to add all the necessary bit and pieces to our DI.

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
    public static class RegistrationExtensions
    {
        public static IServiceCollection AddActor<TActor, TQuery>(this IServiceCollection services)
            where TActor : QueryActor<TQuery>
        {            
            services.AddHostedService<TActor>();
            var inputChannel = Channel.CreateUnbounded<KeyValuePair<Guid, TQuery>>();
            services.AddSingleton(inputChannel.Writer);
            services.AddSingleton(inputChannel.Reader);            
            return services;
        }

        public static IServiceCollection AddAnswerStream(this IServiceCollection services)
        {
            services.AddSingleton<IAnswerStream, AnswerStream>();
            services.AddHostedService(sp => sp.GetRequiredService<IAnswerStream>());
            var answerChannel = Channel.CreateUnbounded<KeyValuePair<Guid, dynamic>>();
            services.AddSingleton(answerChannel.Writer);
            services.AddSingleton(answerChannel.Reader);
            return services;
        }
    }
}

This extension registers:

  • Our AnswerStream
  • Our input Channels
  • Our answer Channels
  • And of course our Actor implementation.

Sample Demo

The Input Side

For demonstration of this system we are going to query for the factorial of a given number. And as an example of utilizing multiple types we'll return an input message. We start with the api queries that is received through our RESTish interface.

namespace ConcurrentFlows.HostedActorSystem.Queries.Api
{
    public record GetFactorialApiQuery([FromQuery] int Message);
    public record GetMessageApiQuery([FromQuery]string Message);
}

Notice how we're not only making use of the new Record types here but also Record model binding in our controller.

namespace ConcurrentFlows.HostedActorSystem.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class SampleController : ControllerBase
    {
        private readonly IAnswerStream answerStream;

        public SampleController(IAnswerStream answerStream)
            => this.answerStream = answerStream ?? throw new ArgumentNullException(nameof(answerStream));

        [HttpGet]
        public async Task<IActionResult> GetFactorial([FromQuery] GetFactorialApiQuery query)
        {
            var actorQuery = new GetFactorialActorQuery(query.Message);
            var result = await answerStream.SubmitQuery(actorQuery);
            return Ok(result);
        }

        [HttpGet("message")]
        public async Task<IActionResult> ReturnMessage([FromQuery] GetMessageApiQuery query)
        {
            var actorQuery = new GetMessageActorQuery(query.Message);
            var result = await answerStream.SubmitQuery(actorQuery);
            return Ok(result);
        }
    }
}

Our controller takes the input api query, translates it to an ActorQuery and submits it to our system. It also gets injected with an AnswerStream that will provide access to the Actor System and the result from the query.

Interconnected Actors

The factorial query in particular is anActorQuery that is a system of two interconnected actors that are invoked with these two queries.

public record GetFactorialActorQuery(int Payload) : ActorQuery<int, int>(Payload);
public record GetReverseRangeActorQuery(int Payload) : ActorQuery<int, IAsyncEnumerable<int>>(Payload);

These queries invoke two interconnected actors which cooperate to produce a result. The deepest Actor is the ReverseRangeQueryActor. This Actor has the responsibility of producing a range of int's in reverse order.

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
    public class GetReverseRangeQueryActor : QueryActor<GetReverseRangeActorQuery>
    {
        public GetReverseRangeQueryActor(
            ChannelReader<KeyValuePair<Guid, GetReverseRangeActorQuery>> queryReader,
            ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
            : base(queryReader, answerWriter)
        {

        }

        public override async Task HandleAsync(KeyValuePair<Guid, GetReverseRangeActorQuery> query, CancellationToken stoppingToken)
        {
            var range = Enumerable.Range(1, query.Value.Payload).Reverse().ToAsyncEnumerable();
            var answer = new KeyValuePair<Guid, dynamic>(query.Key, range);
            await answerWriter.WriteAsync(answer);
        }
    }
}

Notice how this Actor keys the results with the same key provided in the input.

The next Actor up is the FactorialQueryActor. This Actor takes the input value from the query, the range provided by the ReverseRangeQueryActor and computes the factorial.

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
    public class GetFactorialQueryActor : QueryActor<GetFactorialActorQuery>
    {
        private readonly IAnswerStream actorStream;

        public GetFactorialQueryActor(
            ChannelReader<KeyValuePair<Guid, GetFactorialActorQuery>> queryReader,
            ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter,
            IAnswerStream actorStream)
            : base(queryReader, answerWriter)
        {
            this.actorStream = actorStream ?? throw new ArgumentNullException(nameof(actorStream));
        }

        public override async Task HandleAsync(KeyValuePair<Guid, GetFactorialActorQuery> query, CancellationToken stoppingToken)
        {
            var rangeQuery = new GetReverseRangeActorQuery(query.Value.Payload - 1);
            IAsyncEnumerable<int> result = await actorStream.SubmitQuery(rangeQuery);
            var factorial = await result.AggregateAsync(query.Value.Payload, (x, y) => x * y);
            var answer = new KeyValuePair<Guid, dynamic>(query.Key, factorial);
            await answerWriter.WriteAsync(answer);
        }
    }
}

Notice how these Actors are connected using the very same AnswerStream setup that's used to communicate from the controller to the Actor System. The AnswerStream integrates the Actor System so that all Actors can communicate asynchronously without any knowledge of each other except for the defined Query types, just like our standard Application Level CQRS. This leaves us completely decoupled by the queries.

Our Other Actor

The GetMessageActorQuery simply returns the input message.

namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
    public class GetMessageQueryActor : QueryActor<GetMessageActorQuery>
    {
        public GetMessageQueryActor(
            ChannelReader<KeyValuePair<Guid, GetMessageActorQuery>> queryReader,
            ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
            : base(queryReader, answerWriter)
        {

        }

        public override async Task HandleAsync(KeyValuePair<Guid, GetMessageActorQuery> query, CancellationToken stoppingToken)
        {
            await answerWriter.WriteAsync(new KeyValuePair<Guid, dynamic>(query.Key, query.Value.Payload));
        }
    }
}

ConfigureServices: The last piece of the puzzle

The last final piece to show is the simple registration of all our actors.

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();
    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new OpenApiInfo { Title = "ConcurrentFlows.HostedActorSystem", Version = "v1" });
    });
    services.AddAnswerStream();
    services.AddActor<GetFactorialQueryActor, GetFactorialActorQuery>();
    services.AddActor<GetReverseRangeQueryActor, GetReverseRangeActorQuery>();
    services.AddActor<GetMessageQueryActor, GetMessageActorQuery>();
}

And now with everything in place we can test our simple Hosted Actor System and get a result!

ActorSystemOutput.PNG

Summary

How cool was that! I don't know about you but that simple answer is pretty satisfying! We've created a simple Hosted Actor System to replace our request scoped Handlers. Our Actors communicate completely asynchronously and without knowledge of one another through our AnswerStream. These Actors live outside the request scope along with our AnswerStream. Always ready and able to process incoming requests in an asynchronous decoupled manner. Kinda beautiful in a way! Now I'm not saying go out and replace your architecture with this style but only to consider this simple Hosted Actor pattern when the problem space warrants it.

You can find all code GitHub ConcurrentFlows.HashNode

 
Share this