CQRS example in the Rails app
… and check why 5600+ Rails engineers read also this
CQRS example in the Rails app
Recently I have worked in a new project for some time. We implement an application where one functionality is to show an organization’s (tree-like) structure. The structure contains information about relations between employees. I have been thinking how to build this feature. I was wondering if I should go typical way implementing all CRUD actions. In this case I would have to generate the structure for each call. I thought It will be faster. It is a startup. So until the project is young performance is not a problem. Although after few syncs we decided to go an another way.
I am fascinated by the DDD and CQRS approaches. In Arkency we are used to saying that every time we return to the “Implementing DDD” book we realize that it has answers to all the questions and doubts. So going back to the feature we decided to implement the structure as a Read Model.
Example
I love examples so I will not leave you hanging. Our app is split into two parts. We have a frontend implemented in React and a Rails backend. I will focus only on the backend part. If you are interested in how we deal with React you can read some of ours books. In next steps I will show you how we implemented simple CQRS. I will focus on building a Read Model using events.
Starting from the top. The following example shows the controller with basic actions. As you can see we simply call Application services where each one has a separate responsibility. In clean CQRS we should use Commands. We will refactor it in a next step.
class TeamsController < ApplicationController
before_action :authenticate_user_from_token!
def in_organization
teams = AppServices::TeamsInOrganization.new.(params[:organization_id])
render json: { teams: teams }
end
def show
AppServices::GetTeam.new.(params[:id])
end
def create
AppServices::AddNewTeam.new.(params[:team])
end
def update
AppServices::EditTeamData.new.(params[:team])
end
def destroy
AppServices::RemoveTeam.new.(params[:id])
end
end
The following example shows one app service. We use this service to create new a Team.
module AppServices
class AddNewTeam
include EventStore::Injector
def initialize(team_repository = ::OrganizationBc::Adapters::TeamRepository.new)
@team_repository = team_repository
end
def call(team_data)
team = create_new_team(team_data)
publish_team_created(team_data, team.id, parent.department)
team
end
private
attr_reader :team_repository
def create_new_team(team_data)
add_new_team = OrganizationBc::Teams::Services::AddNewTeam.new(team_repository)
add_new_team.(team_data)
end
def publish_team_created(team, team_id, department)
stream_name = "team/#{team_id}"
event = ::Event::TeamCreated.new({
data: {
team: {
id: team_id,
organization_id: team[:organization_id],
name: team[:name],
parent_id: team[:parent_id],
department: team[:department],
type: 'team'
}
}
})
event_store.publish_event(event, stream_name)
end
end
end
module OrganizationBc
module Teams
module Services
class AddNewTeam
def initialize(repository)
@repository = repository
end
def call(team)
team = Team.new({
id: team[:id],
name: team[:name],
organization_id: team[:organization_id]
})
repository.create(team)
end
private
attr_reader :repository
end
end
end
end
I have chosen a simple service to focus on most important parts. As you can see we call a domain service to create Team model and save it into the Database. Team is an aggregate root in relation Team <-> Members. After creating a Team we publish event to the Event Store. We use our own Event Store called RailsEventStore. You can check out the github repository. Publishing event should be placed in aggregate but It was a first step to put in an app service. We don’t use the Event Sourcing. We decided to save a „current” state for now. Event Sourcing is a completely orthogonal concept to CQRS. Doing CQRS does not require event sourcing. But we save all events so It will be very ease to build an aggregate’s state using events.
We inject the EventStore instance using a custom injector. The whole setup you can see bellow.
module EventStore
module Injector
def event_store
@event_store ||= Rails.application.config.event_store
end
end
end
Rails.application.configure do
#other stuff
config.event_store = EventStore::SetupEventStore.new.()
end
module EventStore
class SetupEventStore
def call
event_store = RailsEventStore::Client.new
structure_read_model = OrganizationBc::ReadModels::Structure.new
event_store = config_structure_handler(structure_read_model, event_store)
event_store
end
private
def config_structure_handler(structure_read_model, event_store)
events = ['Event::OrganizationCreated',
'Event::MemberAdded',
'Event::MemberUpdated',
'Event::MemberDeleted',
'Event::MemberUnassigned',
'Event::TeamCreated',
'Event::TeamUpdated',
'Event::TeamRemoved']
event_store.subscribe(structure_read_model, events)
event_store
end
end
end
So the Write part is almost done. In the SetupEventStore
class we define event handler called OrganizationBc::ReadModels::Structure
for our Read Model.
We subscribe it to handle set of events.
module OrganizationBc
module ReadModels
class Structure
def initialize(repository = ::OrganizationBc::Adapters::StructureReadModelRepository.new)
@repository = repository
end
def handle_event(event)
case event.event_type
when 'Event::OrganizationCreated'
create_structure_for_organization(event.data[:organization])
when 'Event::MemberAdded'
update_structure(event.data[:member]) { |structure, member| add_member_to_model(structure, member) }
when 'Event::MemberUpdated'
update_structure(event.data[:member]) { |structure, member| update_member_in_model(structure, member) }
when 'Event::MemberDeleted'
update_structure(event.data[:member]) { |structure, member| remove_member_from_model(structure, member) }
when 'Event::MemberUnassigned'
update_structure(event.data[:member]) { |structure, member| remove_member_from_model(structure, member) }
when 'Event::TeamCreated'
update_structure(event.data[:team]) { |structure, team| add_team_to_model(structure, team) }
when 'Event::TeamUpdated'
handle_team_updated(event)
when 'Event::TeamRemoved'
update_structure(event.data[:team]) { |structure, team| remove_team_from_model(structure, team) }
end
end
private
attr_reader :repository
def create_structure_for_organization(organization)
save_structure(organization[:id])
end
def update_structure(element)
org_structure = get_organization_structure(element[:organization_id])
yield(org_structure[:structure], element.merge!(children: []))
save_structure(element[:organization_id], org_structure)
end
def add_member_to_model(people, member)
if member[:parent_id] == ''
people.push member
else
people.each do |person|
if person['id'] == member[:parent_id]
person['children'] << member
else
add_member_to_model(person['children'], member)
end
end
end
end
def update_member_in_model(people, member)
people.each_with_index do |person, index|
if person['id'] == member[:id]
people[index] = member
else
update_member_in_model(person['children'], member)
end
end
end
def remove_member_from_model(people, member)
people.each_with_index do |person, index|
if person['id'] == member[:id]
people.delete_at(index)
else
remove_member_from_model(person['children'], member)
end
end
end
def add_team_to_model(people, team)
people.each do |person|
if person['id'] == team[:parent_id]
person['children'] << team
else
add_team_to_model(person['children'], team)
end
end
end
def handle_team_updated(event)
#code
end
def update_team_in_nodes(nodes, team)
nodes.map do |node|
if node['id'] == team[:id]
team[:children] = node['children']
team
else
node['children'] = update_team_in_nodes(node['children'], team)
node
end
end
end
def remove_team_from_model(people, team)
people.each_with_index do |person, index|
if person['id'] == team[:id]
people.delete_at(index)
else
remove_team_from_model(person['children'], team)
end
end
end
def get_organization_structure(organization_id)
record = repository.find_by_organization_id!(organization_id)
{structure: record.model['structure'], departments: record.model['departments'], version: record.version}
end
def save_structure(organization_id, model = {structure: [], departments: [], version: 0})
repository.create(organization_id, model)
end
end
end
end
The organization’s structure is a tree structure. Each Team has relation to a parent member and collection of child notes. These nodes are team’s members. We modify the structure’s model and save in Postgres Database for each handled event. We save model in JSON representation. We save a new record in each update to keep whole change history. The following example shows how the repository looks like.
module OrganizationBc
module Adapters
class StructureReadModelRepository
class StructureReadModel < ActiveRecord::Base
self.table_name = 'structures_read_models'
end
ReadModelNotFound = Class.new(StandardError)
def create(organization_id, model)
version = model[:version] + 1
model[:version] = version
record = StructureReadModel.new({ organization_id: organization_id, version: version, model: model})
record.save!
end
def find_by_organization_id!(organization_id)
record = find_by_organization_id(organization_id)
raise ReadModelNotFound.new if record.nil?
record
end
private
def find_by_organization_id(organization_id)
StructureReadModel.where(organization_id: organization_id).order(:version).last
end
end
end
end
When we have build Read Model the last step is to create query for fetching model. We have separate module called AppQueries
where we keep all queries.
So the Read part is only one class. That’s all.
module AppQueries
class LoadOrganizationStructure
def initialize(repository = OrganizationBc::Adapters::StructureReadModelRepository.new)
@repository = repository
end
def call(organization_id)
repository.find_by_organization_id!(organization_id)
end
private
attr_reader :repository
end
end
Conclusion
I can only say that was great decision to start this way. Now I now that we save a lot of time on investigation performance problems in the future. Off course the most important thing is to choose if CQRS is a good starting point. If you have simple CRUD feature it will be unnecessary.
I didn’t focus on test part. I think It is a great subject for a separate post. If you are interested in testing event sourced app you can check this post.
All code used in this post you can find here.