Process managers (sometimes called Sagas) help us with modeling long running processes which happen in our domains. Think of such process as a series of domain events. When enough of them took place (and the particular ones we’re interested in) then we execute a command. The thing is that the events we’re waiting for might take a longer time to arrive, during which our process manager has to keep track of what has been already processed. And that’s where it gets interesting.

Looking for a RoR job? How about working in a flat-structured, employee-owned web dev company in Kraków, Poland? We’re looking for smart, open-minded people. Join the u2i team! Apply!

The Domain

Consider following example taken from catering domain. You’re an operations manager. Your task is to suggest your customer a menu they’d like to order and at the same time you have to confirm that caterer can deliver this particular menu (for given catering conditions). In short you wait for CustomerConfirmedMenu and CatererConfirmedMenu. Only after both happened you can proceed further. You’ll likely offer several menus to the customer and each of them will need a confirmation from corresponding caterers.

If there’s a match of CustomerConfirmedMenu and CatererConfirmedMenu for the same order_id you cheer and trigger ConfirmOrder command to push things forward. By the way there’s a chance you may as well never hear from the caterer or they may decline, so process may as well never complete ;)

Classical example

Given the tools from RailsEventStore ecosystem I use on a daily basis, the implementation might look more or less like this:

class CateringMatch
  class State < ActiveRecord::Base
    self.table_name = :catering_match_state
    # order_id
    # caterer_confirmed
    # customer_confirmed

    def self.get_by_order_id(order_id)
      transaction do
        yield lock.find_or_create_by(order_id: order_id)
      end
    end

    def complete?
      caterer_confirmed? && customer_confirmed?
    end
  end
  private_constant :State

  def initialize(command_bus:)
    @command_bus = command_bus
  end

  def call(event)
    order_id = event.data(:order_id)
    State.get_by_order_id(order_id) do |state|
      case event
      when CustomerConfirmedMenu
        state.update_column(:customer_confirmed, true)
      when CatererConfirmedMenu
        state.update_column(:caterer_confirmed, true)
      end

      command_bus.(ConfirmOrder.new(data: {
        order_id: order_id
      })) if state.complete?
    end
  end
end

This process manager is then enabled by following RailsEventStore instance configuration:

RailsEventStore::Client.new.tap do |client|
  client.subscribe(CateringMatch.new(command_bus: command_bus),
    [CustomerConfirmedMenu, CatererConfirmedMenu])
end

Whenever one of the aforementioned domain events is published by the event store, our process manager will be called with that event as an argument.

Implementation above uses ActiveRecord (with dedicated table) to persist internal process state between those executions. In addition you’d have to run database migration and create this table. I was just about to code it but then suddenly one of those aha moments came.

We already know how to persist events — that’s what we use RailsEventStore for. We also know how to recreate state from events with event sourcing. Last but not least the input for process manager are events. Wouldn’t it be simpler for process managers to eat it’s own dog food?

Let’s do this!

My first take on event sourced process manager looked something like this:

require 'aggregate_root'

module EventSourcing
  def apply(event)
    apply_strategy.(self, event)
    unpublished_events << event
  end

  def load(stream_name, event_store:)
    events = event_store.read_stream_events_forward(stream_name)
    events.each do |event|
      apply(event)
    end
    @unpublished_events = nil
  end

  def store(stream_name, event_store:)
    unpublished_events.each do |event|
      event_store.append_to_stream(event, stream_name: stream_name)
    end
    @unpublished_events = nil
  end

  private       
  def unpublished_events
    @unpublished_events ||= []
  end

  def apply_strategy
    ::AggregateRoot::DefaultApplyStrategy.new
  end
end

class CateringMatch
  class State
    include EventSourcing

    def initialize
      @caterer_confirmed  = false
      @customer_confirmed = false
    end

    def apply_caterer_confirmed_menu(_)
      @caterer_confirmed = true
    end

    def apply_customer_confirmed_menu(_)
      @customer_confirmed = true
    end

    def complete?
      caterer_confirmed? && customer_confirmed?
    end
  end
  private_constant :State

  def initialize(command_bus:, event_store:)
    @command_bus = command_bus
    @event_store = event_store
  end

  def call(event)
    order_id = event.data(:order_id)
    stream_name = "CateringMatch$#{order_id}"

    state = State.new
    state.load(stream_name, event_store: @event_store)
    state.apply(event)
    state.store(stream_name, event_store: @event_store)

    command_bus.(ConfirmOrder.new(data: {
      order_id: order_id
    })) if state.complete?
  end
end

When process manager is executed, we load already processed events from stream (partitioned by order_id). Next we apply the event that just came in, in the end appending it to stream to persist. The trigger with condition stays unchanged since it is only the State implementation that we made different.

In theory that could work, I could already feel that dopamine kick after job well done. In practice, the reality brought me this:

Failure/Error: event_store.append_to_stream(event, stream_name: stream_name)

ActiveRecord::RecordNotUnique:
  PG::UniqueViolation: ERROR:  duplicate key value violates unique constraint "index_event_store_events_on_event_id"
  DETAIL:  Key (event_id)=(bddeffe8-7188-4004-918b-2ef77d94fa65) already exists.
  : INSERT INTO "event_store_events" ("event_id", "stream", "event_type", "metadata", "data", "created_at") VALUES ($1, $2, $3, $4, $5, $6) RETURNING "id"

Doh!

I forgot about this limitation of RailsEventStore. You can’t yet have the same event in multiple streams. By contrast in GetEventStore streams are cheap and that’s one of the common use cases.

Take 2

Given the RailsEventStore limitation I had to figure out something else. The idea was just too good to give it up that soon. And that’s when second aha moment arrived!

There’s this RailsEventStore::Projection mechanism, which let’s you traverse multiple streams in search for particular events. When one is found, given lambda is called. Sounds familiar? Let’s see it in full shape:

class CateringMatch
  class State
    def initialize(event_store:, stream_name:)
      @event_store = event_store
      @stream_name = stream_name
    end

    def complete?
      initial =
        { caterer_confirmed: false,
          customer_confirmed: false,
        }
      state =
        RailsEventStore::Projection
          .from_stream(@stream_name)
          .init(->{ initial })
          .when(CustomerConfirmedMenu, ->(state, event) {
              state[:customer_confirmed] = true
            })
          .when(CatererConfirmedMenu, ->(state, event) {
              state[:caterer_confirmed] = true
            })
          .run(@event_store)
      state[:customer_confirmed] && state[:caterer_confirmed]
    end
  end
  private_constant :State

  def initialize(command_bus:, event_store:)
    @command_bus = command_bus
    @event_store = event_store
  end

  def call(event)
    order_id = event.data(:order_id)            
    state    = State.new(event_store: @event_store, stream_name: "Order$#{order_id}")

    command_bus.(ConfirmOrder.new(data: {
     order_id: order_id
    })) if state.complete?
  end
end

Implementation is noticeably shorter (thanks to hidden parts of RailsEventStore::Projection). Works not only in theory. And this is the one I chose to stick with for my process manager.

I cannot however say I fully like it. The smell for me is that we peek into the stream that does not exclusively belong to the process manager (it does belong to aggregate into whose stream CustomerConfirmedMenu and CatererConfirmedMenu were published). Another culprit comes when testing. Projection can only work with events persisted in streams, so it is not sufficient to only pass an event as an input to process manager. You have to additionally persist it.

RSpec.describe CateringMatch do
  facts = [
    CustomerConfirmedMenu.new(data: { order_id: '42' }),
    CatererConfirmedMenu.new(data: { order_id: '42' })
  ]
  facts.permutation.each do |fact1, fact2|
    specify do
      command_bus = spy(:command_bus)
      event_store = RailsEventStore::Client.new

      CateringMatch.new(event_store: event_store, command_bus: command_bus).tap do |process_manager|
        event_store.append_to_stream(fact1, stream_name: "Order$#{fact1.data[:order_id]}")
        process_manager.(fact1)

        event_store.append_to_stream(fact2, stream_name: "Order$#{fact2.data[:order_id]}")
        process_manager.(fact2)
      end

      expect(command_bus).to have_received(:call)
    end
  end
end

Would you choose event backed state for process manager as well? Let me know in comments!