One simple trick to make Event Sourcing click

… and check why 5600+ Rails engineers read also this

One simple trick to make Event Sourcing click

Event Sourcing is like having two methods when previously there was one. There — I’ve said it.

But it isn’t my idea at all.

It was Greg that used it first, in a bit different context. When explaining CQRS he used this exact words:

Starting with CQRS, CQRS is simply the creation of two objects where there was previously only one. The separation occurs based upon whether the methods are a command or a query (the same definition that is used by Meyer in Command and Query Separation, a command is any method that mutates state and a query is any method that returns a value).

You can have quite a similar statement on event-sourced aggregate root. The separation occurs based upon whether the method:

  • corresponds to an action we want to take on an aggregate — protects business rules and tells what domain event happened if those were met
  • maps consequences of the domain event that happened to internal state representation (against which business rules are executed)

Not convinced yet? Let the examples speak.

Stereotypical aggregate without Event Sourcing

Below is a typical aggregate root. In the scope of the example there are only two actions you can take — via public register and supply methods.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, quantity_available: 0)
    @store_id = store_id
    @sku = sku
    @quantity_available = quantity_available
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id
    @store_id = store_id
    @sku = sku

    event_store.publish_event(ProductRegistered.new(data: {
      store_id: @store_id,
      sku: @sku,
    }))
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    @quantity_available += quantity

    event_store.publish_event(ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    }))
  end
end

Aggregate with Event Sourcing

In event sourcing it is the domain events that are our source of truth. They state what happened. What we need to do is to make them a bit more useful and convenient for decision making. This is the sourcing part.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, quantity_available: 0)
    @store_id = store_id
    @sku = sku
    @quantity_available = quantity_available
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id

    event = ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    })

    event_store.publish_event(event)
    registered(event)
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    event = ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    })

    event_store.publish_event(event)
    supplied(event)
  end

  private

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

In this step we’ve drawn the line between making a statement that something happened (being possible to happen first) and what side effects does it have. Notice private registered and supplied methods.

Why make such effort and introduce indirection? The reason is simple — if the events are source of truth, we could not only shape internal state for current actions we take but also for the ones that happened in the past.

Instead of loading current state stored in a database, we can take collection of events that happened in scope of this aggregate — in its stream.

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  def initialize(store_id: nil, sku: nil, event_store:)
    stream_name = "Product$#{store_id}-#{sku}"
    events = event_store.read_all_events_forward(stream_name)
    events.each do |event|
      case event
      when ProductRegistered then registered(event)
      when ProductSupplied then supplied(event)
      end
    end
  end

  def register(store_id:, sku:, event_store:)
    raise AlreadyRegistered if @store_id

    event = ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    })

    event_store.publish_event(event)
    registered(event)
  end

  def supply(quantity, event_store:)
    raise CannotSupply unless @store_id && @sku

    event = ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    })

    event_store.publish_event(event)
    supplied(event)
  end

  private

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

At this point you may have figured out that event_store dependency that we constantly pass as an argument belongs more to the infrastructure layer than to a domain and business.

What if something above passed a list of events first so we could rebuild the state? After an aggregate action happened we could provide a list of domain events to be published (unpublished_events):

class Product
  CannotSupply = Class.new(StandardError)
  AlreadyRegistered = Class.new(StandardError)

  attr_reader :unpublished_events  

  def initialize(events)
    @unpublished_events = []
    events.each { |event| dispatch(event) }
  end

  def register(store_id:, sku:)
    raise AlreadyRegistered if @store_id

    apply(ProductRegistered.new(data: {
      store_id: store_id,
      sku: sku,
    }))
  end

  def supply(quantity)
    raise CannotSupply unless @store_id && @sku

    apply(ProductSupplied.new(data: {
      store_id: @store_id,
      sku: @sku,
      quantity: quantity,
    }))
  end

  private

  def apply(event)
    dispatch(event)
    @unpublished_events << event
  end

  def dispatch(event)
    case event
    when ProductRegistered then registered(event)
    when ProductSupplied then supplied(event)
    end
  end

  def supplied(event)
    @quantity_available += event.data.fetch(:quantity)
  end

  def registered(event)
    @sku = event.data.fetch(:sku)
    @store_id = event.data.fetch(:store_id)
  end
end

More or less this reminds the aggregate_root gem that is aimed to assist you with event sourced aggregates.

The rule of having two methods when there was previously one however still holds.

  • The public method (such as supply) corresponds to an action we want to take on an aggregate — protects business rules and tells what domain event happened if those rules were met.
  • The private method (such as supplied) maps consequences of the domain event that happened to the internal state representation.

Have a great day!

You might also like