Reactive State Machines

Implementing a state machine as a 'stream of transitions' in Rx

Published on 09 November 2016

Today I'd like to share an implementation I've recently been employing that leverages Rx to implement a state-machine.

State-machines are a terrific pattern that, when applied correctly, can greatly simplify the implementation, maintenance and extensibility of many types of functionality; from application lifecycle management to business process coordination. They're particularly helpful with long-running, asynchronous processes that need to behave differently at various stages of the process, especially when the process is message or event driven.

Normally, state machines are defined by "a list of its states, its initial state, and the triggering condition for each transition". However, when looked at from the point of view of "everything being a stream", they can also be viewed as a stream of transitions with states existing as the rest period between those transitions.

In this mindset, a state can simply be defined as follows:

public interface IState
{
    IObservable<ITransition> Enter();
}

In short, the result of entering a state is an observable of transitions away from the state.

With this in hand, the state machine of, for example, a typical UWP app can be defined as follows:

public class StateMachine
{        
    private readonly IStateFactory _factory;
    private readonly Subject<IState> _state;

    public Machine(IStateFactory factory)
    {
        _factory = factory;

        _state = new Subject<IState>();
    }

    public IDisposable Initialize()
    {
        // First create a stream of transitions by ...
        IObservable<ITransition> transitions = _state
            // ... starting from the initializing state ...
            .StartWith(_factory.Initializing())
            // ... enter the current state ...
            .Select(state => state.Enter())
            // ... subscribing to the transition observable ...
            .Switch()
            // ... and ensure only a single shared subscription is made to the transitions observable ...
            .Publish()
            // ... held until there are no more subscribers
            .RefCount();

        // Then, for each transition type, select the new state...
        IObservable<IState> states = Observable.Merge(
            states.OfType<Transition.ToStarting>().Select(transition => _factory.Starting()),
            states.OfType<Transition.ToResuming>().Select(transition => _factory.Resuming()),
            states.OfType<Transition.ToRunning>().Select(transition => _factory.Running()),
            states.OfType<Transition.ToSuspending>().Select(transition => _factory.Suspending())
        );

        // Finally, subscribe to the state observable ...
        return states
            // ... ensuring all transitions are serialized ...
            .ObserveOn(Scheduler.CurrentThread)
            // ... back onto the source state observable
            .Subscribe(_state);
    }
}

There are no constraints regarding which events/triggers each state uses to construct the observable of transitions returned when the Enter method is called, but they usually follow one of two patterns:

  1. Wait for an event/trigger to be received and transition to a new state based on the type of event/trigger received.
  2. Perform an asynchronous / long-running process and transition to another state when it completes.

To illustrate this, the Initializing and Starting state definitions below can be considered to be examples of the former and latter patterns respectively.

internal class Initializing : IState
{
    private readonly IEventAggregator _bus;

    public Initializing(IEventAggregator bus)
    {
        _bus = bus;
    }

    public IObservable<ITransition> Enter()
    {
        return Observable.Merge<ITransition>(
            _bus.GetEvent<Event.Start>().Select(_ => new Transition.ToStarting()),
            _bus.GetEvent<Event.Resume>().Select(_ => new Transition.ToResuming())
        );
    }
}
internal class Starting : IState
{
    private readonly DataStore.IContext _dataStoreContext;

    public Starting(DataStore.IContext dataStoreContext)
    {
        _dataStoreContext = dataStoreContext;
    }

    public IObservable<ITransition> Enter()
    {
        return Observable.Create<ITransition>(
            async observer =>
            {
                await _dataStoreContext.InitializeAsync();

                observer.OnNext(new Transition.ToRunning());
            }
        );
    }
}

As you can see, states can (and should!) be very small and easily testable. All together a complete, strongly typed, 'async' friendly state machine can be implemented with just a handful of classes containing minimal code.

I have used this pattern on numerous occasions and enjoy the simplicity and extensibility it affords me when defining long-running process flows.