Low-boilerplate process manager

A low-boilerplate process manager

Process Manager — you can think of it as something that takes events on input and produces a command on output. You might have found yourself in situations, where you felt you should be implementing one.

A simple example that often happens: you’re familiar with event handlers, but at some point it seems like you need an event handler that would activate after two distinct events happen — not just one. Often people are tempted to propagate attributes in events. It’s rarely a good idea. It might be a good place to employ a process manager.

Typically, to implement it, you handle specific events and store process manager state in an plain ol’ active record — but it’s not the only way to do it. Since introduction of linking events in RES, another approach is viable: event sourced process managers. Their advantage is that you don’t need to set up another db table. You might have already read about them in a blogpost by Paweł.

What I’m giving you here is another implementation of it. A simple implementation that hopefully illustrates the essential parts. You can take it and tweak it. It utilizes linking to streams and RES projections:

class OrderFulfillment
  def call(event)
    event_store.link(event.event_id, stream_name: stream_for(event))

    state = build_state(stream_for(event))

    execute(state) if state[:order_placed] && state[:payment_captured]
  end

  private

  def event_store
    Rails.configuration.event_store
  end

  def stream_for(event)
    "OrderFulfillment$#{event.data[:order_id]}"
  end

  def build_state(stream_name)
    RailsEventStore::Projection
      .from_stream(stream_name)
      .init(-> { {} })
      .when(OrderPlaced, -> (state, event) {
        state[:order_placed] = true
        state[:order_id] = event.data[:order_id]
      })
      .when(PaymentCaptured, -> (state, event) {
        state[:payment_captured] = true
      })
      .run(event_store)
  end

  def execute(state)
    FulfillOrder.new.call(state[:order_id])
  end
end

How it’s wired up:

event_store.subscribe(OrderFulfillment.new, to: [
  OrderPlaced,
  PaymentCaptured,
])

What happens here

Exactly three things:

1. “Capture” the event that is needed to determine the process manager’s state (put it into the process manager’s stream):

event_store.link(event.event_id, stream_name: stream_for(event))

2. Fetch all the events currently linked to the PM’s stream and build the current state from them:

state = build_state(stream_for(event))

3. If the conditions needed for the process to complete are met, execute the piece of code.

execute(state) if state[:order_placed] && state[:payment_captured]

What can be different

  • This snippet is moderately primitive-obsessed. You can improve by implementing state as a class, and execute if state.complete?.
  • Typically, a command is the thing that you execute upon completion of the process. It’s nice to think of a process manager as something that takes events and produces a command. I spared it here because not everyone has a command bus in place.
  • Does order of events and concurrency matter for your PM? RES provides several options to control it.

That’s it!

Got comments/questions? Ping or DM me on twitter or reply under this tweet. Did this piece help you? Can we improve something? Let me know, your feedback is very valuable!

Now, a plug 🔌. Join ARKADEMY.DEV and get access to our best courses: Rails Architect Masterclass, Anti-IF course, Blogging for busy programmers, Async Remote, TDD video class, Domain-Driven Rails video course and growing!

You might also like