Building an Event Sourced application using railseventstore
… and check why 5600+ Rails engineers read also this
Building an Event Sourced application using rails_event_store
Still not sure what is Event Sourcing and CQRS? Looking for a practical example? You’ve just read my previous post Why use EventSourcing and decided to give it a try? No matter why let’s dive into building an Event Sourced application in Rails using Arkency’s Rails Event store.
CQRS
But first you need to learn what CQRS is.
CQRS stands for Command and Query Responsibility Segregation. It’s a term coined by Greg Young and later polished in discussion with others (naming only a few Udi Dahan, Martin Fowler). It is based on CQS (Command Query Separation) devised by Bertrand Meyer:
Every method should either be a command that performs an action, or a query that returns data to the caller, but not both. In other words, asking a question should not change the answer.
Bertrand Meyer
CQRS is a way of building your application. It is not a pattern, not an architecture, not a framework. The best description here will be “architectural style”.
It’s just about separating reads and writes and building a separate stack (layers) for each of them:
- writes - it could be complex, could be modelled in separate layers like: application service, domain model (maybe using domain services also), with transactions required and complex stuff happening. We care here about consistency of our data more than about performance (usually we write data in a row of magnitude less often than we read them).
- reads - dead simple!, tailor made for your views, no abstraction over a data model - here is the part where Active Record shines - just define a model and use it - but read only!
You may say “Ok, but I wanted to learn about Event Sourcing and you’re writing here about some CQRS architecture style.” There is a good reason for that:
You can use CQRS without Event Sourcing, but with Event Sourcing you must use CQRS.
Greg Young
Crunching your Domain
Let’s start building our Event Sourced application with defining a domain model. When using Event Sourcing your domain model (your aggregates) are build based on domain events. Its underlying data model is not storing current state but series of domain events that have been applied to that aggregate since the beginning of its life. This allows you to build your aggregate that will not expose its state and will be able to protects its invariants.
Also an aggregate is a source of new domain events. Every call of a method for an aggregate might result in publishing new domain events. Every change in internal state of an aggregate must be done by publishing and applying a new domain event. No state change in other way! Only then we could ensure that our aggregate will be rebuild to the same state from events.
module Events
class OrderCreated < RailsEventStore::Event
def order_number
@data.fetch(:order_number)
end
#... some more stuff here
def self.create(order_id, order_number, customer_id)
new(data: {order_id: order_id, order_number: order_number, customer_id: customer_id})
end
end
end
I usually create a helper method to access event’s attributes witch are stored the in @data
hash attribute of RailsEventStore::Event
. And I like also to add a class method create
to build a new event with explicitly given parameters, but a RailsEventStore::Event is also a good way of creating a domain event.
Ok, now when we have domain events defined let’s apply them to our domain object.
module Domain
class Order
include AggregateRoot
AlreadyCreated = Class.new(StandardError)
MissingCustomer = Class.new(StandardError)
def initialize(id = SecureRandom.uuid)
@id = id
@state = :draft
end
def create(order_number, customer_id)
raise AlreadyCreated unless state == :draft
raise MissingCustomer unless customer_id
apply Events::OrderCreated.create(@id, order_number, customer_id)
end
#... some more stuff here
def apply_order_created(event)
@customer_id = event.customer_id
@number = event.order_number
@state = :created
end
private
attr_accessor :id, :customer_id, :order_number, :state
#... some more stuff here
end
end
Let’s see what is going on in our Domain::Order
class.
First the initialize
method. It builds the initial state of an aggregate, a state where all begins :)
Then we have the create
method. It should be used by other objects to invoke aggregate features. Here we should protect our invariants, check business rules do validations etc. From CQRS we know it should not return a value - it should just execute or fail (by raising error). In the create
method we never change the state of an aggregate. Instead we build and apply a new domain event. The apply
method is defined in AggregateRoot
module:
module AggregateRoot
def apply(event, new = true)
send("apply_#{event.class.name.demodulize.tableize.singularize}", event)
changes << event if new
end
def changes
@changes ||= []
end
def rebuild(events)
events.each { |event| apply(event, false) } if events
end
end
By calling apply Events::OrderCreated.create(@id, order_number, customer_id)
we are creating a new OrderCreated
domain event, applying it to our aggregate - what results in a change of state, and storing it in the changes
collection. It stores all domain events created by the aggregate during its method execution.
The last thing to take a look here is the apply_order_created
method. This method updates the aggregate state based on domain event. It does not matter if the event was published by the aggregate itself or it was read from Event Store and applied when rebuilding the aggregate state. There must not be any business rules check or validations here. What has happened has happened. It is already done. We just reflect those changes in aggregate state.
And one more thing: notice that all state of an aggregate object is private. Not available outside aggregate itself - even for reads.
Commands
A command is a simple object that encapsulates parameters for an action to be executed. Should be named in a business language (Ubiquitous Language) and express the user’s intention. Before a command is executed it should be validated. Those validations should be simple, out of full context, just based on command data and read model data. Should not check business rules (that will be handled later by domain object).
module Commands
class CreateOrder < Command
attr_accessor :order_id
attr_accessor :customer_id
validates :order_id, presence: true
validates :customer_id, presence: true
alias :aggregate_id :order_id
end
end
They are also known as Form Object (in Ruby world).
Base class Command
uses some ActiveModel features to allow simple validation and creation of command objects.
class Command
ValidationError = Class.new(StandardError)
include ActiveModel::Model
include ActiveModel::Validations
include ActiveModel::Conversion
def initialize(attributes={})
super
end
def validate!
raise ValidationError, errors unless valid?
end
def persisted?
false
end
end
Handling a command
A command handler is a entry point to your domain. It should handle all “plumbing”, orchestrate domain objects and domain services and execute domain object’s methods with parameters given in a handled command. There could be several sources of commands send to our application: users, external systems or sagas (process managers).
Here is how I’ve defined “plumbing” for my sample Event Sourced application: https://github.com/mpraglowski/cqrs-es-sample-with-res/blob/master/lib/command_handler.rb
This is the only module where I use core features of Rails Event Store. It loads events from RES in load_events
using aggregate id as a stream name. It publishes events in the publish
method. The publish in RES will store the published event in a given stream (again aggregate id) and then send it to all subscribers. This is an important assumption! What is not stored is never published.
So with the use of the CommandHandler
module the CreateOrder
command is handled by:
module CommandHandlers
class CreateOrder
include Injectors::ServicesInjector
include CommandHandler
def call(command)
with_aggregate(command.aggregate_id) do |order|
order_number = number_generator.call
order.create(order_number, command.customer_id)
end
end
def aggregate_class
Domain::Order
end
end
end
The with_aggregate(command.aggregate_id)
method will return a Domain::Order
object recreated from events read from RES. On an Order command handler executes a create
method using parameters from given command.
The command is send from any controller using method from ApplicationController
(notice it is validated before execution):
def execute(command)
command.validate!
handler = "CommandHandlers::#{command.class.name.demodulize}"
handler.constantize.new.call(command)
end
Building a read model
Till now we have implemented:
- user sends command
- command is handled and domain logic is executed
- domain events are stored and published by RES
But when domain objects are not exposing their state we need another way to build the data for views - the read model. The model is a model as your application needs. It is denormalized - no costly joins, no lazy load, just select data and present them as is. It is tailor-made! And one more thing: it could be anything - relational DB, NoSQL store, graph database (i.e. RethinkDB).
Here we go with build in Rails Event Store pub/sub feature. If you took a look at source of CommandHandler
module you might have noticed
def event_store
@event_store ||= RailsEventStore::Client.new.tap do |es|
es.subscribe(Denormalizers::Router.new)
end
end
The es.subscribe(Denormalizers::Router.new)
will create a subscription for all events to event handler Denormalizers::Router
where events will be routed to appropriate denormalisers that will create/update read model defined as a simple ActiveRecord classes.
Currently all this works synchronously - stay tuned for next post when some async features will be introduced.