Low-boilerplate process manager

A low-boilerplate process manager

Process Manager — you can think of it as something that takes events on input and produces a command on output. You might have found yourself in situations, where you felt you should be implementing one.

A simple example that often happens: you’re familiar with event handlers, but at some point it seems like you need an event handler that would activate after two distinct events happen — not just one. Often people are tempted to propagate attributes in events. It’s rarely a good idea. It might be a good place to employ a process manager.

Typically, to implement it, you handle specific events and store process manager state in an plain ol’ active record — but it’s not the only way to do it. Since introduction of linking events in RES, another approach is viable: event sourced process managers. Their advantage is that you don’t need to set up another db table. You might have already read about them in a blogpost by Paweł.

What I’m giving you here is another implementation of it. A simple implementation that hopefully illustrates the essential parts. You can take it and tweak it. It utilizes linking to streams and RES projections:

class OrderFulfillment
  def call(event)
    event_store.link(event.event_id, stream_name: stream_for(event))

    state = build_state(stream_for(event))

    execute(state) if state[:order_placed] && state[:payment_captured]
  end

  private

  def event_store
    Rails.configuration.event_store
  end

  def stream_for(event)
    "OrderFulfillment$#{event.data[:order_id]}"
  end

  def build_state(stream_name)
    RailsEventStore::Projection
      .from_stream(stream_name)
      .init(-> { {} })
      .when(OrderPlaced, -> (state, event) {
        state[:order_placed] = true
        state[:order_id] = event.data[:order_id]
      })
      .when(PaymentCaptured, -> (state, event) {
        state[:payment_captured] = true
      })
      .run(event_store)
  end

  def execute(state)
    FulfillOrder.new.call(state[:order_id])
  end
end

How it’s wired up:

event_store.subscribe(OrderFulfillment.new, to: [
  OrderPlaced,
  PaymentCaptured,
])

What happens here

Exactly three things:

1. “Capture” the event that is needed to determine the process manager’s state (put it into the process manager’s stream):

event_store.link(event.event_id, stream_name: stream_for(event))

2. Fetch all the events currently linked to the PM’s stream and build the current state from them:

state = build_state(stream_for(event))

3. If the conditions needed for the process to complete are met, execute the piece of code.

execute(state) if state[:order_placed] && state[:payment_captured]

What can be different

  • This snippet is moderately primitive-obsessed. You can improve by implementing state as a class, and execute if state.complete?.
  • Typically, a command is the thing that you execute upon completion of the process. It’s nice to think of a process manager as something that takes events and produces a command. I spared it here because not everyone has a command bus in place.
  • Does order of events and concurrency matter for your PM? RES provides several options to control it.

That’s it!

Got comments/questions? Ping or DM me on twitter or reply under this tweet. Did this piece help you? Can we improve something? Let me know, your feedback is very valuable!

Would you like to learn how to write such blogposts? Join our FREE dev blogging conference - learn how to write and promote blogposts without sacrificing the coding time. It's 3 days, 4 talks - April 14-16th (Wed-Fri). Register to grab your spot.

You might also like