Joshua Steward
Concurrent Flows

Concurrent Flows

Decoupled Process Management

Decoupled Process Management

Toward a Message Driven Architecture

Joshua Steward's photo
Joshua Steward
·Jul 6, 2021·

18 min read

Background

Imagine, if you will, that in the course of designing a new api you have the requirement to implement a complex process. The approach presented here will produce a completely message driven architecture that can be implemented and integrated with any messaging infrastructure that you choose. This post will cover one way in which you decouple your complex process into a sequence of phases decoupled from the calling code by a messaging system. This system is similar to a pervious post about a simple actor system. We're also going to extend the concept presented in that post and marry the concepts with ideas presented in the post message routing.

Challenge

The challenge here is to derive a generic Process Management System. This system should be capable of driving a user defined process and do so in completely decoupled message driven way.

Solution

Infrastructure

First we'll cover the infrastructure of our solution. These components will lay the ground work for our sample.

The Messages

This implementation is going to be completely message driven and independent of the request context. The Process Handlers will live as BackgroundServicesand always be alive and ready to process messages. For this system to work first we need to define some base Message records.

public record ProcessStartMessage<TInput>(TInput Input);

public record ProcessStartedMessage(Guid ProcessId);

public record ProcessEndedMessage(Guid ProcessId, ProcessPhase Phase);

public record ProcessActivity;

public record ProcessMessage<TInput>(Guid ProcessId, ProcessPhase Phase, TInput Input);

public record ProcessMessage<TInput, TActivity>(Guid ProcessId, ProcessPhase Phase, TInput Input, TActivity Activity)
    : ProcessMessage<TInput>(ProcessId, Phase, Input)
    where TActivity : ProcessActivity
    where TInput : ProcessInput;

public record ProcessInput;

public record ProcessPhase(string Phase)
{
    public static ProcessPhase Completed = new ProcessPhase(nameof(Completed));
    public static ProcessPhase Failed = new ProcessPhase(nameof(Failed));
}

Let's go through each message and base type.

  • ProcessStartMessage: This message signals the ProcessStartHandler to begin the defined process, this message is used as a type argument constarint and should be the base class of a derived process specific message as we'll see in amoment.
  • ProcessStartedMessage: This is the message that will be published when the process begins and provides the id assigned to the process, so that calling code can use the process id to query information about the state of the process. In this post we'll leave most of the calling code out and focus on the Process Management System
  • ProcessEndedMessage: This message signals the calling code that the process has ended, it provides the process id and the phase of the process when the process stopped. This incorporation of the Phase can tell the client whether the process aborted or completed successfully.
  • ProcessMessage<TInput>: This is base of all process messages, this is what we'll use to constrain our PhaseTransitionHandler such that it has access to the properties we need.
  • ProcessMessage<TInput, TActivity>: This message along with an Activity type will be used throughout the process lifetime. The Activity parameter is associated with the ProcessMessage with a particular activity that's being carried out. This message also carries along with it the full input object, the current phase of process and the id of the process to which it's associated.
  • ProcessActivity and ProcessInput: are just constraint records to be derived from for an Activity or Input respectively.
  • ProcessPhase: Process Phase defines the high level general state of the process. For example a derived ProcessPhase could be: Validation, StageRollback, StageInsert. We can derive and define our process in terms of phases. Each phase may generate N messages with N activities. In the base class we define two general Phases: Completed and Failed these two phases are made common to all derived phase sets. I've chose this style of defining phases instead of something like enumeration because personally I don't enums and this record gives us a better constraint since we cannot derive from a base enum.

A process is a set of messages that are initiated by phase transition. A high level view of a process can be seen in this figure

Process Handling.jpeg

As you can see each phase emits N messages to be collected by the Process Phase Transition Handler. This handler then determines if a phase transition is necessary and if so generates the next phase of messages. If the Process Phase Transition Handler determines the process is Complete a completed message is emitted and the process is over.

The Message System

Let's now look at the communication stream. This stream abstracts our underlying messaging system into a simple interface with just the methods we need. We could implement this interface with: Channels, Observables, Kafka, Azure Servicebus, SignalR, Azure Pub/Sub or any other messaging system we choose so long as this interface is met.

public interface IMessageSystemReader<TMessage>
{
    public ValueTask<bool> MessageReady(CancellationToken token = default);
    public IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token = default);
    public Task Completion { get; }
}

public interface IMessageSystemWriter<TMessage>
{
    public ValueTask WriteAsync(TMessage message, CancellationToken token = default);
    public Task Shutdown();
}

Using the Interface Segregation principle we break this interface down into two, one reader and one writer. These interfaces also define a way to shutdown the stream and and notify the client that the stream is now closed.

In this post we'll use channels as our underlying transport.

public class MessageSystem<TMessage>
    : IMessageSystemReader<TMessage>,
    IMessageSystemWriter<TMessage>
{
    private ChannelReader<TMessage> reader;
    private ChannelWriter<TMessage> writer;

    public MessageSystem()
    {
        var channel = Channel.CreateUnbounded<TMessage>();
        reader = channel.Reader;
        writer = channel.Writer;
    }

    public Task Completion => reader.Completion;

    public ValueTask<bool> MessageReady(CancellationToken token = default)
        => reader.WaitToReadAsync(token);

    public IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token = default)
        => reader.ReadAllAsync(token);

    public ValueTask WriteAsync(TMessage message, CancellationToken token = default)
        => writer.WriteAsync(message, token);

    public Task Shutdown()
    {
        writer.Complete();
        return reader.Completion;
    }
}

Message System Extensions

As you can see the implementation is just a simple wrapper around a pair of Channel reader/writers. Next let's add some helper methods to this interface such that all we have to worry about in our Handler code is the logic necessary to handle our defined process.

public static class MessagingExtensions
{
    public static async IAsyncEnumerable<TMessage> ContinuousWaitAndReadAllAsync<TMessage>(
        this IMessageSystemReader<TMessage> reader,
        [EnumeratorCancellation] CancellationToken token = default)
    {
        while (!reader.Completion.IsCompleted && !token.IsCancellationRequested)
        {
            var readerReady = await reader.MessageReady(token);
            if (readerReady)
                await foreach (var message in reader.ReadAllAsync(token))
                    yield return message;
        }
    }

    public static async ValueTask RouteMessageByTypeAsync(this IWriterProvider provider, object message)
    {
        var writer = provider.RequestWriter(message);
        await writer.WriteAsync((dynamic)message);
    }

    public static async ValueTask<bool> SendIfEnding(this ProcessPhase phase, IWriterProvider provider, Func<ProcessEndedMessage> endedMessageFatory)
    {
        if (phase == ProcessPhase.Completed || phase == ProcessPhase.Failed)
        {
            await provider.RouteMessageByTypeAsync(endedMessageFatory());
            return true;
        }
        return false;
    }

    public static bool ContainsActivity<TActivity>(this IEnumerable<dynamic> messages)
        where TActivity : ProcessActivity 
        => messages.Any(msg => msg.Activity.GetType() == typeof(TActivity));
}

We have one method that wraps the reader waiting and receiving into one easy async enumerator. Then we have the writer method that dispatches a message by it's type. And we have one extension to determine if the process has completed or failed and send out the ended message. And we also have a method to determine what activies exist in the current set of messages. This begs the question; Ok, well what's the WriterProvider?

The Writer Provider

public class WriterProvider : IWriterProvider
{
    private readonly IServiceProvider serviceProvider;

    public WriterProvider(IServiceProvider serviceProvider)
    {
        this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
    }

    public dynamic RequestWriter(object message)
    {
        var writerType = typeof(IMessageSystemWriter<>).MakeGenericType(message.GetType());
        dynamic writer = serviceProvider.GetService(writerType);
        if (writer is null)
            throw new ArgumentException($"Message Writer not registered for {message.GetType().Name}");
        return writer;
    }
}

You may ask, why do we not just use a TMessage type argument instead of message.GetType(), well this is done because TMessage could be a base type and we want the Writer we get to be strongly typed to the actual message that's being sent. Our WriterProvider simply hides the IServiceProvider and uses it to retrieve the correct writer. Remember now that the writer type can be implemented with nearly any transport that can at least satisfy the basic interface. So if we change transports all our code doesn't need to change only the implementation of IMessageSystem<TMessage> needs to change.

Dynamic is chosen for the return type because, remember, we used message.GetType() for the generic type we got from the IServiceProvider. We want to preserve that type and not constrain it to a TMessage which may be a base type.

Process Start

Next let's see how we start a process. This is an abstract class that implements the vast majority of functionality any derived handler would need. The responsibilities of this handler are quite simple. It lives as a BackgorundService and listens for a particularly defined TStartMessage. Once a message is received it responds with a defined overridden StartedMessageFactory result. Then it emits the first Phase messages down the line to the next part of the system.

public abstract class ProcessStartHandler<TStartMessage, TInput>
    : BackgroundService
    where TStartMessage : ProcessStartMessage<TInput>
    where TInput : ProcessInput
{
    private readonly IMessageSystemReader<TStartMessage> startReader;
    private readonly IWriterProvider writerProvider;
    private readonly IMessageFactory<TInput> messageFactory;
    private readonly ProcessPhase startPhase;

    public ProcessStartHandler(
        IWriterProvider writerProvider,
        IMessageFactory<TInput> messageFactory,
        IMessageSystemReader<TStartMessage> startReader,
        ProcessPhase startPhase)
    {
        this.startReader = startReader ?? throw new ArgumentNullException(nameof(startReader));
        this.writerProvider = writerProvider ?? throw new ArgumentNullException(nameof(writerProvider));
        this.messageFactory = messageFactory ?? throw new ArgumentNullException(nameof(messageFactory));
        this.startPhase = startPhase ?? throw new ArgumentNullException(nameof(startPhase));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var message in startReader.ContinuousWaitAndReadAllAsync(stoppingToken))
        {
            var processId = Guid.NewGuid();
            await SendStartedMessage(processId);
            await SendFirstPhaseMessages(processId, message);
        }
    }

    private async ValueTask SendStartedMessage(Guid processId)
    {

        var message = StartedMessageFactory(processId);
        await writerProvider.RouteMessageByTypeAsync(message);
    }

    private async ValueTask SendFirstPhaseMessages(Guid processId, TStartMessage message)
    {
        await foreach (var newMessage in messageFactory[startPhase](processId, startPhase, message.Input, Enumerable.Empty<object>()))
        {
            await writerProvider.RouteMessageByTypeAsync(newMessage);
        }

    }

    protected abstract ProcessStartedMessage StartedMessageFactory(Guid processId);
}

With our handy extension methods in use for both reading and writing our code in here is straight to the point: Listen for start, Emit started with ProcessId & Emit first phase messages. Now you may be asking; What's the message factory and why is it needed? Well we don't want our handlers to have too much responsibility. The start handler already covers a lot of functionality and it shouldn't be responsible for knowing how to format/create/generate the first phase messages. If it were to be responsible for that we could end up with a huge list of dependencies and even new type parameters to deal with. This would be unsatisfactory, so we delegate to a single purpose object: the message factory.

The Message Factory

public interface IMessageFactory<TInput> 
    : IReadOnlyDictionary<ProcessPhase, Func<ProcessPhase, ProcessInput, IEnumerable<object>, IAsyncEnumerable<object>>>
    where TInput : ProcessInput
{

}

public abstract class MessageFactory<TInput>
    : Dictionary<ProcessPhase, Func<Guid, ProcessPhase, TInput, IEnumerable<object>, IAsyncEnumerable<object>>>,
    IProcessMessageFactory<TInput>
    where TInput : ProcessInput
{

}

As you can see this is a generic interface that simply needs the type of TInput and the rest is quite simple. Based on a ProcessPhase key we get a Func that takes the current phase, the ProcessInput and a collection that is describing the currently received messages for this process. With these inputs it generates a batch of messages to be sent out. When this interface is implemented each phase of the implementing process must be accounted for.

We also have a handy abstract class for implementations to inherit from such that all they are concerned with is defining what messages come from specific keys.

Phase Transitioning

Next, we're going to look at the next abstract BackgroundService, this is the only other operating service in this system. This Handler listens for any of the predefined TMessage, collects them and uses both the MessageFactory we've seen and the upcoming PhaseTransitions factory.

public abstract class PhaseTransitionHandler<TMessage, TInput>
    : BackgroundService
    where TMessage : ProcessMessage<TInput>
    where TInput : ProcessInput
{
    private readonly IWriterProvider writerProvider;
    private readonly IPhaseTransitions<TInput> phaseTransitions;
    private readonly IMessageFactory<TInput> messageFactory;
    private readonly IMessageSystemReader<TMessage> reader;

    private ConcurrentDictionary<Guid, ICollection<object>> MessagesReceived = new ConcurrentDictionary<Guid, ICollection<object>>();

    public PhaseTransitionHandler(
        IWriterProvider writerProvider,
        IPhaseTransitions<TInput> phaseTransitions,
        IMessageFactory<TInput> messageFactory,
        IMessageSystemReader<TMessage> reader)
    {
        this.writerProvider = writerProvider ?? throw new ArgumentNullException(nameof(writerProvider));
        this.phaseTransitions = phaseTransitions ?? throw new ArgumentNullException(nameof(phaseTransitions));
        this.messageFactory = messageFactory ?? throw new ArgumentNullException(nameof(messageFactory));
        this.reader = reader ?? throw new ArgumentNullException(nameof(reader));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var message in reader.ContinuousWaitAndReadAllAsync(stoppingToken))
        {
            if (MessagesReceived.TryGetValue(message.ProcessId, out var messages))
                messages.Add(message);
            else
                MessagesReceived[message.ProcessId] = new List<object>() { message };

            var nextPhase = phaseTransitions[message.Phase](message.Phase, message.Input, MessagesReceived[message.ProcessId]);
            if (await nextPhase.SendIfEnding(writerProvider, () => EndedMessageFactory(message.ProcessId, nextPhase)))
            {
                MessagesReceived.Remove(message.ProcessId, out var _);
                await CompletedEvent(nextPhase, message);
            }
            else if (ShouldTransition(message.Phase, nextPhase))
            {
                await PhaseTransitionEvent(nextPhase, message);
                await foreach (var newMessage in messageFactory[nextPhase](message.ProcessId, nextPhase, message.Input, MessagesReceived[message.ProcessId]))
                {
                    await writerProvider.RouteMessageByTypeAsync(newMessage);
                }
            }
        }
    }

    protected abstract ProcessEndedMessage EndedMessageFactory(Guid processId, ProcessPhase phase);

    protected virtual ValueTask CompletedEvent(ProcessPhase phase, TMessage message)
        => ValueTask.CompletedTask;

    protected virtual ValueTask PhaseTransitionEvent(ProcessPhase phase, TMessage message) 
        => ValueTask.CompletedTask;

    private bool ShouldTransition(ProcessPhase currentPhase, ProcessPhase newPhase)
        => !currentPhase.Equals(newPhase);
}

This handler listens for any of defined TMessage types and collects them by ProcessId. Each time a message is received and stored the PhaseTranstions component is used to check whether or not to transition to a new phase. If a Phase transition is required the handler then uses the MessageFactory to produce and then send the next batch of messages. This handler also provides hooks into the process for both the CompletedEvent and the PhaseTransitionEvent such that inheriters can perform any extra logic they might want on those two events.

The Phase Transition Dictionary

Similar to the MessageFactory our PhaseTransition dictionary is keyed by ProcessPhase and produces a function that takes: the current phase, the original input, a collection of returned message and produces a ProcessPhase if the phase produced is different from the current phase, the PhaseTransitionHandler responds appropriately by moving on to the next batch of messages.

public interface IPhaseTransitions<TInput>
    : IReadOnlyDictionary<ProcessPhase, Func<ProcessPhase, TInput, IEnumerable<object>, ProcessPhase>>
    where TInput : ProcessInput
{

}

public abstract class PhaseTransitions<TInput>
    : Dictionary<ProcessPhase, Func<ProcessPhase, TInput, IEnumerable<object>, ProcessPhase>>,
    IPhaseTransitions<TInput>
    where TInput : ProcessInput
{

}

Again we have a handy abstract base class that inherits a Dictionary so all the implementation has to do is fill itself with the correct Phase Transitions.

The Worker

So now we have a whole system intended to produce messages based on the phase of a process. But we still need something to process those messages and do the actual work those messages entail. Enter the Command Handler, we could call this an Actor but to fit the terminology used here we'll just refer to it as a Handler. I already posted about Query Actors which take a query and then return a response. Since our process is completely async we won't be using the request/response architecture described in that post. Instead our command handler will process the incoming message or command and send out a response.

public abstract class CommandHandler<TMesssage> : BackgroundService
{
    protected readonly IMessageSystemReader<TMesssage> reader;

    public CommandHandler(IMessageSystemReader<TMesssage> reader)
    {
        this.reader = reader ?? throw new ArgumentNullException(nameof(reader));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var command in reader.ContinuousWaitAndReadAllAsync(stoppingToken))
        {
            await HandleAsync(command, stoppingToken);
        }
    }

    public abstract ValueTask HandleAsync(TMesssage command, CancellationToken stoppingToken);
}

The Registrations

Finally we need to register all the components with our DI. This is done through a set of extension methods

public static class RegistrationExtensions
{
    public static IServiceCollection AddMessageSystem<TMessage>(this IServiceCollection services)
    {
        var messageSystem = new MessageSystem<TMessage>();
        services.AddSingleton<IMessageSystemWriter<TMessage>>(messageSystem);
        services.AddSingleton<IMessageSystemReader<TMessage>>(messageSystem);
        return services;
    }

    public static IServiceCollection AddCommandHandler<TActor, TCommand>(this IServiceCollection services)
        where TActor : CommandHandler<TCommand> 
        => services
            .AddHostedService<TActor>()
            .AddMessageSystem<TCommand>();

    public static IServiceCollection AddWriterProvider(this IServiceCollection services)
        => services.AddSingleton<IWriterProvider, WriterProvider>();

    public static IServiceCollection AddProcessStartHandler<THandler, TMessageFactory, TStartMessage, TEndMessage, TInput>(this IServiceCollection services)
        where THandler : ProcessStartHandler<TStartMessage, TInput>
        where TMessageFactory : MessageFactory<TInput>
        where TStartMessage : ProcessStartMessage<TInput>
        where TInput : ProcessInput
        => services
            .AddHostedService<THandler>()
            .AddSingleton<IMessageFactory<TInput>, TMessageFactory>()
            .AddMessageSystem<TStartMessage>()
            .AddMessageSystem<TEndMessage>();

    public static IServiceCollection AddPhaseTransitionHandler<THandler, TPhaseTransitions, TMessage, TInput>(this IServiceCollection services)
        where THandler : PhaseTransitionHandler<TMessage, TInput>
        where TPhaseTransitions : PhaseTransitions<TInput>
        where TMessage : ProcessMessage<TInput>
        where TInput : ProcessInput 
        => services
            .AddHostedService<THandler>()
            .AddSingleton<IPhaseTransitions<TInput>, TPhaseTransitions>()
            .AddMessageSystem<TMessage>();
}

These extension methods are all we need to register our entire Process Management System. The first registers the implementation of our Message System based on a defined message type parameter. The next, registers our Command Handler and it's associated Message System The next just registers our Writer Provider. Then we register the Process Start Handler along with the Message Factory and the appropriate input and output channels. And, finally we register the Phase Transition Handler, the Phase Transitions Dictionary and it's input channels.

Sample Demo

Overview

Now that we have all the infrastructure in place it's time to make use of it. For this example we'll be doing a basic "Hello World" type process. We'll define all our messages, phases, and handlers. The goal is to have process that has two steps: Validate the input, Respond with a message.

The Messages

The messages for our sample define the ways in which downstream consumers will respond and how we expect them to act. We define our process as a series of messages. This sample process will first validate our input with some simple validation, then it will write out the message to consumers. This process extends the ProcessPhase by defining two additional phases of our process Validation and SayHello.

public record SayHelloInput(string Name)
    : ProcessInput;

public record SayHelloProcessStartMessage(SayHelloInput Input)
    : ProcessStartMessage<SayHelloInput>(Input);

public record SayHelloProcessStartedMessage(Guid ProcessId)
    : ProcessStartedMessage(ProcessId);

public record SayHelloEndedMessage(Guid ProcessId, ProcessPhase Phase)
        : ProcessEndedMessage(ProcessId, Phase);

public record SayHelloProcessMessage<TActivity>(Guid ProcessId, ProcessPhase Phase, SayHelloInput Input, TActivity Activity)
    : ProcessMessage<SayHelloInput, TActivity>(ProcessId, Phase, Input, Activity)
    where TActivity : ProcessActivity;

public record SayHelloValidation(SayHelloInput Input)
        : ProcessActivity;

public record SayHelloValidationSuccess
    : ProcessActivity;

public record SayHelloValidationFailure
    : ProcessActivity;

public record SayHelloResponseActivity(string Input)
    : ProcessActivity;

public record SayHelloCompletedActivity
    : ProcessActivity;

public record SayHelloResponseMessage(string message);

public record SayHelloPhases(string Phase)
        : ProcessPhase(Phase)
{
    public static ProcessPhase Validation = new ProcessPhase(nameof(Validation));
    public static ProcessPhase SayHello = new ProcessPhase(nameof(SayHello));
}

These messages, activities and inputs define our Say Hello process. We start with the start message that carries the user input into our process stream. We have a derived started message to signal the client that the process has started and a derived ended message to signal that the process has ended in a particular phase. Next we have a derived process message that will be used to carry out our process doing specified activities. We have defined four activities for this process SayHelloValidation SayHelloValidationSuccess, SayHelloValidationFailure and SayHelloRepsonse. These activities will be coordinated by our PhaseTransition handler and our eventual message factory we'll see in a moment. And finally we defined the phases of our process as Validation and SayHello, remember our base record ProcessPhase already defines Completed and Failed for us so all we need to do is define the working phases of our process.

The Process Handlers

Next we define our derived handlers for our specific process, first the Start Handler

public class SayHelloStartHandler : ProcessStartHandler<SayHelloProcessStartMessage, SayHelloInput>
{
    public SayHelloStartHandler(
        IWriterProvider writerProvider,
        IMessageFactory<SayHelloInput> messageFactory,
        IMessageSystemReader<SayHelloProcessStartMessage> startReader)
        : base(writerProvider, messageFactory, startReader, SayHelloPhases.Validation)
    {
    }

    protected override ProcessStartedMessage StartedMessageFactory(Guid processId)
        => new SayHelloProcessStartedMessage(processId);
}

Since we don't need any additional functionality, all we do is specify the working types and implement the message factory and constructor and we're done with this! Then for our concreate Message Factory

public class SayHelloMessageFactory : MessageFactory<SayHelloInput>
{
    public SayHelloMessageFactory()
    {
        this[SayHelloPhases.Validation] = (processId, phase, input, messages) => ValidationPhaseMessages(processId, phase, input, messages);
        this[SayHelloPhases.SayHello] = (processId, phase, input, messages) => SayHelloPhaseMessages(processId, phase, input, messages);
    }

    public IAsyncEnumerable<object> ValidationPhaseMessages(Guid processId, ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
        => new[]
        {
            new SayHelloProcessMessage<SayHelloValidation>(processId, phase, input, new SayHelloValidation(input))
        }.ToAsyncEnumerable();

    public IAsyncEnumerable<object> SayHelloPhaseMessages(Guid processId, ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
        => new[]
        {
            new SayHelloProcessMessage<SayHelloResponseActivity>(processId, phase, input, new SayHelloResponseActivity(input.Name))
        }.ToAsyncEnumerable();
}

Since we only have two phases and each phase only produces one message our Message Factory is quite simple in this case. Next we define our Phase Transition Handler

public class SayHelloPhaseTransistionsHandler : PhaseTransitionHandler<SayHelloProcessMessage<ProcessActivity>, SayHelloInput>
{
    public SayHelloPhaseTransistionsHandler(
        IWriterProvider writerProvider,
        IPhaseTransitions<SayHelloInput> phaseTransitions,
        IMessageFactory<SayHelloInput> messageFactory,
        IMessageSystemReader<SayHelloProcessMessage<ProcessActivity>> reader)
        : base(writerProvider, phaseTransitions, messageFactory, reader)
    {
    }

    protected override ProcessEndedMessage EndedMessageFactory(Guid processId, ProcessPhase phase)
        => new SayHelloEndedMessage(processId, phase);
}

And again no special behavior is needed so all we're doing is setting type arguments, and implementing the constructor and ended message factory. Finally, our Phase Transitions gets defined.

public class SayHelloPhaseTransitions : PhaseTransitions<SayHelloInput>
{
    public SayHelloPhaseTransitions()
    {
        this[SayHelloPhases.Validation] = (phase, input, messages) => ValidationPhaseTransition(phase, input, messages);
        this[SayHelloPhases.SayHello] = (phase, input, messages) => SayHelloPhaseTransition(phase, input, messages);
    }

    public ProcessPhase ValidationPhaseTransition(ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
    {
        if (currentMessages.ContainsActivity<SayHelloValidationSuccess>())
            return SayHelloPhases.SayHello;
        else if (currentMessages.ContainsActivity<SayHelloValidationFailure>())
            return SayHelloPhases.Failed;
        return phase;
    }

    public ProcessPhase SayHelloPhaseTransition(ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
    {
        if (currentMessages.ContainsActivity<SayHelloCompletedActivity>())
            return SayHelloPhases.Completed;
        return phase;
    }
}

Note how we use the state provided to compute the next phase of the process, if validation fails we move directly to failed, if validation succeeds we continue on to the next phase. Once the SayHelloCompletedActivity has been emitted we mark the process as Completed.

The Workers

First we implement our validation handler. The validation we're doing here is to simply check that the string is not too long.

public class SayHelloValidationHandler
    : CommandHandler<SayHelloProcessMessage<SayHelloValidation>>
{
    private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> successWriter;
    private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> failureWriter;

    public SayHelloValidationHandler(
        IMessageSystemReader<SayHelloProcessMessage<SayHelloValidation>> reader,
        IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> successWriter,
        IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> failureWriter)
        : base(reader)
    {
        this.successWriter = successWriter ?? throw new ArgumentNullException(nameof(successWriter));
        this.failureWriter = failureWriter ?? throw new ArgumentNullException(nameof(failureWriter));
    }

    public override async ValueTask HandleAsync(SayHelloProcessMessage<SayHelloValidation> command, CancellationToken stoppingToken)
    {
        var input = command.Input.Name;
        if (input.Length > 10)
            await failureWriter.WriteAsync(
                new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloValidationFailure()));
        else
            await successWriter.WriteAsync(
                new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloValidationSuccess()));
    }
}

Notice how we only need to implement the HandleAsync method and are provided with read command from our Message System. If validation passes we emit a successful activity, if validation fails we emit a failed activity and the Phase Transition Dictionary and Handler will take it from there. Next we define our Say Hello Handler

public class SayHelloResponseHandler
    : CommandHandler<SayHelloProcessMessage<SayHelloResponseActivity>>
{
    private readonly IMessageSystemWriter<SayHelloResponseMessage> responseWriter;
    private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> completedWriter;

    public SayHelloResponseHandler(
        IMessageSystemReader<SayHelloProcessMessage<SayHelloResponseActivity>> reader,
        IMessageSystemWriter<SayHelloResponseMessage> responseWriter,
        IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> completedWriter)
        : base(reader)
    {
        this.responseWriter = responseWriter ?? throw new ArgumentNullException(nameof(responseWriter));
        this.completedWriter = completedWriter ?? throw new ArgumentNullException(nameof(completedWriter));
    }

    public override async ValueTask HandleAsync(SayHelloProcessMessage<SayHelloResponseActivity> command, CancellationToken stoppingToken)
    {
        var message = $"Hello there, {command.Input.Name}";
        await responseWriter.WriteAsync(new SayHelloResponseMessage(message));
        await completedWriter.WriteAsync(new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloCompletedActivity()));
    }
}

Again all we're doing is implementing the HandleAsync method with our logic which is to simply emit a response message and emit a completed message back to our infrastructure.

The Registrations

Next we register all our process's components with simple extension method.

public static class RegistrationExtensions
{
    public static IServiceCollection AddSayHelloProcess(this IServiceCollection services)
        => services
        .AddWriterProvider()
        .AddProcessStartHandler<SayHelloStartHandler, SayHelloMessageFactory, SayHelloProcessStartMessage, SayHelloEndedMessage, SayHelloInput>()
        .AddPhaseTransitionHandler<SayHelloPhaseTransistionsHandler, SayHelloPhaseTransitions, SayHelloProcessMessage<ProcessActivity>, SayHelloInput>()
        .AddCommandHandler<SayHelloValidationHandler, SayHelloProcessMessage<SayHelloValidation>>()
        .AddCommandHandler<SayHelloResponseHandler, SayHelloProcessMessage<SayHelloResponseActivity>>()
        .AddMessageSystem<SayHelloResponseMessage>()
        .AddMessageSystem<SayHelloProcessStartedMessage>();
}

Note how we add dedicated Message Systems for our Started Message and our ResponseMessage

The Client Code

We'll keep our client code simple and not respond to any failure and have it simply timeout if a successful message is not received, in this post we're not interested in elaborate clients. We're simply proving the orchestration of handlers works.

[ApiController]
[Route("[controller]")]
public class SampleController : ControllerBase
{
    private readonly IMessageSystemWriter<SayHelloProcessStartMessage> startWriter;
    private readonly IMessageSystemReader<SayHelloResponseMessage> responseReader;

    public SampleController(
        IMessageSystemWriter<SayHelloProcessStartMessage> startWriter,
        IMessageSystemReader<SayHelloResponseMessage> responseReader)
    {
        this.startWriter = startWriter ?? throw new ArgumentNullException(nameof(startWriter));
        this.responseReader = responseReader ?? throw new ArgumentNullException(nameof(responseReader));
    }

    [HttpGet]
    public async ValueTask<SayHelloResponseMessage> Get(string name)
    {
        var startMessage = new SayHelloProcessStartMessage(new SayHelloInput(name));
        await startWriter.WriteAsync(startMessage);
        using var readTokenSource = new CancellationTokenSource();
        readTokenSource.CancelAfter(TimeSpan.FromSeconds(1));
        return await responseReader.ContinuousWaitAndReadAllAsync(readTokenSource.Token).FirstAsync();
    }
}

Here we take a writer for the Start Message and pass our input. We also take a reader for the output and provide a CancellationToken to timeout the request if anything fails. We also don't distinguish between Response Messages for individual requests, we could identify them by Process Id but we'll leave that for next time.

And finally we can try everything out and receive a response!

Hello World.PNG

Summary

Here we've defined a generic infrastructure to orchestrate complex processes based on two handlers, two dictionaries and a set of messages. The beautiful thing about this system is that it can fundamentally be used with any underlying transport. Complex process are broken down into simple phases, messages and transitions.

Well I hoped you enjoyed my latest. As always the full code can be found at ConcurrentFlows.HashNode GitHub

 
Share this