Reactive ReadModels

A functional approach to building ReadModels with Rx

Published on 23 November 2015

If you're starting out with Event Sourcing you're probably looking for the ability to project events into read models that can be used by the views in your application. Today I'd like to share a neat little mechanism I've established for building read model in C# by employing a strongly typed yet functional approach.

Starting with infrastructure, you probably have some form event distribution mechanism, normally a pub/sub style bus. From this bus you need to be able to selectively subscribe to all the events you need to collate the information required by your read model. Furthermore, you will often need to initialize the read model with information from a persistent store - often this is a persistent store of the read model itself but this isn't necessarily the case. Lastly, your application must be able to request a specific read model and receive notifications when the read model changes.

To illustrate how this might be achieved, I have written a small sample project following the traditional Account/Order model that features the following components:

Sample Components

In this system, the application is able to request a stream of read models from a repository. The repository will retrieve initial read model state from a persistence service and then subscribe to events from an event bus to continually update and publish the read model. Only read models the application needs should be published and only events the read model requires should be processed.

By leveraging Rx we're able to implement a stub of the repository very easily as follows:

public class Repository
{
    private readonly Service.IAccountService _accountService;
    private readonly IEventAggregator _bus;

    public Repository(Service.IAccountService accountService, IEventAggregator bus)
    {
        _accountService = accountService;
        _bus = bus;
    }

    public IObservable<ReadModel> For(Guid id)
    {
        if (id == null) throw new ArgumentNullException("id");

        return Observable.Create<ReadModel>(
            async observer =>
            {
                Service.AccountInfo accountInfo = await _accountService.GetAccountInfoAsync(id);

                ReadModel readModel = accountInfo.ToReadModel();

                // Subscribe to events and apply to read model

                return Observable.StartWith(readModel).Subscribe(observer);
            }
        );
    }
}

This code is written such that, when an observer subscribes to the IObservable<ReadModel> returned from the For method, an asynchronous request will be made to the IAccountService for the initial state of the read model, a ReadModel instance will be constructed from the information returned by the service and finally the ReadModel will be emitted to the subscriber. Note that the observable is deliberately left incomplete to indicate the further ReadModel instances might be emitted.

Now we have the current read model, we need to subscribe to events from the event bus that represent changes pertinent to the read model and, when received, modify and emit the read model so that the subscriber is always up to date.

To do this, we might think to write something like this:

public IObservable<ReadModel> For(Guid id)
{
    if (id == null) throw new ArgumentNullException("id");

    return Observable.Create<ReadModel>(
        async observer =>
        {
            Service.AccountInfo accountInfo = await _accountService.GetAccountInfoAsync(id);

            ReadModel readModel = accountInfo.ToReadModel();

            return new CompositeDisposable(
                _bus.GetEvent<Event.AccountNameChanged>().Where(@@event => id.Equals(@@event.AccountId)).Select(@@event => readModel.WithAccountName(@@event.AccountName)).Subscribe(observer),
                _bus.GetEvent<Event.AddBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Select(@@event => readModel.WithBillingAddress(new Address(@@event.AddressName))).Subscribe(observer),
                _bus.GetEvent<Event.RemoveBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Select(@@event => readModel.WithoutBillingAddress(@@event.AddressName)).Subscribe(observer),
                _bus.GetEvent<Event.OrderInvoiced>().Where(@@event => id.Equals(@@event.AccountId)).Select(@@event => readModel.WithCurrentOrder(new Order(@@event.OrderId))).Subscribe(observer),
                _bus.GetEvent<Event.OrderDispatched>().Where(@@event => id.Equals(@@event.AccountId)).Select(@@event => readModel.WithCompletedOrder(@@event.OrderId)).Subscribe(observer)
            );
        }
    );
}

While this code looks sensible, we'll very quickly run into problems with it. Firstly and primarily each subscription is only ever modifying the initial readModel instance meaning changes are not cumulative. Secondly, if we somehow overcame the initial issue, because we're maintaining a series of subscriptions here, changes are not thread safe nor deterministic.

No, what we need is a way of writing the effect of these changes into a common read model instance, aggregating those changes across multiple events and emitting the new read model each time it is updated. Fortunately, Rx has exactly the operator we need for this: IObservable<TAccumulate> Scan<TSource, TAccumulate>(this IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)

This operator takes an initial TAccumulate seed and applies each item emitted from the IObservable<TSource> source to the accumulator by invoking a function that takes the current accumulator value and the emitted item and returning a new accumulator value. Each of these new accumulator values are then emitted as an IObservable<TAccumulate> instances to the subscriber.

Great, so it's pretty obvious here that our accumulator should be our read model, but how do we subscribe to all the different message types and apply a different modifications to the read model for each type of message received when the Scan function only allows for a single TSource type. We need to somehow homogenise the source stream to a single type. Often this is done with inheritance but, as there's no common base type for each of these messages, we'd have to write something like this:

public IObservable<ReadModel> For(Guid id)
{
    if (id == null) throw new ArgumentNullException("id");

    return Observable.Create<ReadModel>(
        async observer =>
        {
            Service.AccountInfo accountInfo = await _accountService.GetAccountInfoAsync(id);

            ReadModel readModel = accountInfo.ToReadModel();

            IObservable<object> events = Observable.Merge(
                _bus.GetEvent<Event.AccountNameChanged>().Where(@@event => id.Equals(@@event.AccountId)).Cast<object>(),
                _bus.GetEvent<Event.AddBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Cast<object>(),
                _bus.GetEvent<Event.RemoveBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Cast<object>(),
                _bus.GetEvent<Event.OrderInvoiced>().Where(@@event => id.Equals(@@event.AccountId)).Cast<object>(),
                _bus.GetEvent<Event.OrderDispatched>().Where(@@event => id.Equals(@@event.AccountId)).Cast<object>()
            );

            return events.Scan(readModel,
                (readModel, @@event) =>
                {
                    if (@@event is Event.AccountNameChanged)
                    {
                        return readModel.WithAccountName(((Event.AccountNameChanged)@@event).AccountName);
                    }
                    else if (@@event is Event.AddBillingAddress)
                    {
                        ...
                    }
                    ...
                    else
                    {
                        throw new ArgumentException("Unknown message type");
                    }
                }
            );
        }
    );
}

... which is not type safe, has very little compile time checking, requires a fall-through exception and, quite frankly, is horrible. I'm sure with Rx we can do something better.

Well, lets look at this from a functional perspective. The action we want to perform when receiving each type of event can be considered to be a function in the form of Func<TReadModel, TEvent, TReadModel>. This is very similar to the third parameter of the Observable.Scan function with the problem being that we have multiple types of TEvent. If only there was a way to produce a generic function that would operate correctly across all the event types.

Well, by leaning on a concept from functional languages, this might be possible. Partial application has been available in C# - via closures - since lambdas were introduced and allows you to:

fix a number of arguments to a function, producing another function of smaller arity. Given a function f:(X * Y * Z) -> N , we might fix (or 'bind') the first argument, producing a function of type (f){partial}:(Y * Z) -> N . Evaluation of this function might be represented as f{partial}(2, 3). Note that the result of partial function application in this case is a function that takes two arguments

So is there some way we can leverage this to remove the TEvent argument from the function? Absolutely, by making the observable emit a stream of partial functions in the form Func<ReadModel,ReadModel> which can then be applied to the read model as follows:

public IObservable<ReadModel> For(Guid id)
{
    if (id == null) throw new ArgumentNullException("id");

    return Observable.Create<ReadModel>(
        async observer =>
        {
            Service.AccountInfo accountInfo = await _accountService.GetAccountInfoAsync(id);

            IObservable<Func<ReadModel, ReadModel>> mutators = Observable.Merge(
                _bus.GetEvent<Event.AccountNameChanged>().Where(@@event => id.Equals(@@event.AccountId)).Select(Functions.Apply),
                _bus.GetEvent<Event.AddBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Select(Functions.Apply),
                _bus.GetEvent<Event.RemoveBillingAddress>().Where(@@event => id.Equals(@@event.AccountId)).Select(Functions.Apply),
                _bus.GetEvent<Event.OrderInvoiced>().Where(@@event => id.Equals(@@event.AccountId)).Select(Functions.Apply),
                _bus.GetEvent<Event.OrderDispatched>().Where(@@event => id.Equals(@@event.AccountId)).Select(Functions.Apply)
            );

            return mutators.Scan(accountInfo.ToReadModel(), (readModel, mutator) => mutator(readModel)).Subscribe(observer);
        }
    );
}

Here the Functions.Apply is an overloaded method for each of the event types. For example, the method for Event.AccountNameChanged is implemented as follows:

public static Func<ReadModel, ReadModel> Apply(Event.AccountNameChanged @@event)
{
    return readModel => readModel.WithAccountName(@@event.AccountName);
}

This method takes the event to be processed as a parameter and returns a function in the form Func<ReadModel,ReadModel> that closes over the parameter and calls the WithAccountName extension method when executed. A similar pattern is followed for all the other event types.

Once we have projected and merged all the event streams into a single IObservable<Func<ReadModel, ReadModel>> we can use the scan function to iteratively apply each function to the cumulative read model instance and emit the result after each event.

Pretty neat huh!


de.insertBefore(a,m) })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); ga('create', 'UA-70151903-1', 'auto'); ga('send', 'pageview'); EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */ var disqus_shortname = 'ibebbs'; // required: replace example with your forum shortname /* * * DON'T EDIT BELOW THIS LINE * * */ (function () { var s = document.createElement('script'); s.async = true; s.type = 'text/javascript'; s.src = '//' + disqus_shortname + '.disqus.com/count.js'; (document.getElementsByTagName('HEAD')[0] || document.getElementsByTagName('BODY')[0]).appendChild(s); }()); ld(dsq); })();