How to build a read model with Rails Event Store Projection
… and check why 5600+ Rails engineers read also this
Recently I faced interesting challenge in one of our customer’s application. Imagine that you take a test after which you get a personalised reports about your skills level. Existing mechanism for that was time and resource consuming. People had to wait for e-mail delivery with PDF-generated report several hours due to several constraints, which I would prefer not to dive into.
The solution was obvious — lets progressively build read model every time someone answers the question. After the test is done, the report will be available instantly in a web ui.
Let’s start with a domain event
module TestExecution
class AnswerRegistered < ::Event
attribute :participant_id, Types::Integer
attribute :test_id, Types::Integer
attribute :question_id, Types::Integer
attribute :answer_id, Types::Integer
attribute :skill_id, Types::Integer
attribute :score, Types::Float
attribute :time_elapsed, Types::Integer
end
end
Nothing fancy, a typical domain event powered by Rails Event Store, with a schema defined, keeping identifiers of involved entities and score calculated by the domain service which publishes the event above when its job is done.
Build the read model
Next building block is the asynchronous handler. Why asynchronous? Not to waste time on participant’s request—response cycle and lower their satisfaction from using our application:
module Reporting
class CalculateparticipantReport < ApplicationJob
prepend RailsEventStore::AsyncHandler
def perform(event)
participant_id = event.data.fetch(:participant_id)
test_id = event.data.fetch(:test_id)
link_to_stream(event, test_id, participant_id)
scores = calculate_scores(test_id, participant_id)
ParticipantReport.write(
*prepare_data_for_read_model(scores, test_id, participant_id)
)
end
private
def prepare_data_for_read_model(scores, test_id, participant_id)
# magic happens, querying additional info, formatting data
[scores, participant, test]
end
def calculate_scores(test_id, participant_id)
RailsEventStore::Projection
.from_stream(stream_name(test_id, participant_id))
.init(-> { Hash.new { |scores, skill_id| scores[skill_id] = { score: 0, number_of_scores: 0 } })
.when(
SurveyExecution::AnswerRegistered,
->(state, event) do
skill_id = event.data.fetch(:skill_id)
state[skill_id][:score] += event.data.fetch(:score)
state[skill_id][:number_of_scores] += 1
end
)
.run(Rails.configuration.event_store)
.reduce({}) do |scores, (skill_id, values)|
scores[skill_id] = values[:score] / values[:n]
scores
end
end
def link_to_stream(event, test_id, participant_id)
begin
Rails.configuration.event_store.link(
event.event_id,
stream_name: stream_name(test_id, participant_id)
)
rescue RubyEventStore::EventDuplicatedInStream
end
end
def stream_name(test_id, participant_id)
"ParticipantReport$#{test_id}-#{participant_id}"
end
end
end
What happens here:
AnswerRegistered
event is linked to a dedicated report streamparticipantReport$123-456
. By doing that, we can scope events in a way we desired, in our case, the stream contains id of a test and participant.RubyEventStore::EventDuplicatedInStream
is being rescued to support deliver at least once strategy.- Then, with the use of Projection reading from our dedicated
stream
ParticipantReport$123-456
all the scores are grouped by theskill_id
, accumulated with additional info ( number of elements, specifically). After the projection is done,reduce
is being used to do the math, resulting in average scores for each skill. - When the scores are ready, further calculations come and additional info for read model (like skill names, etc.) is gathered and formatted. There’s no need to use any other query to present it to the participant.
How the read model looks like?
id | report_slug | participant_name | test_name | skills |
---|---|---|---|---|
997 | cf827527c552 | Jane Doe | Important skillz test | [{name: 'Sleeping', average: '2.5', global: '2.2'}, #...] |
998 | 6adb1fc1d201 | Ugly Joe | Programming skills assessment | [{name: 'Ruby', average: '4.0', global: '2.0'}, #...] |
999 | 4cece2d44ae0 | Mr Kobayashi | Smartness test | [{name: 'Whatever', average: '5.0', global: '1.0'}, #...] |
Vaughn Vernon in his „Implementing Domain-Driven Design” book describes read model this way:
The query model is a denormalized data model. It is not meant to deliver domain behavior, only data for display (and possibly reporting). If this data model is a SQL database, each table would hold the data for a single kind of client view (display). The table can have many columns, even a superset of those needed by any given user interface display view. Table views can be created from tables, each of which is used as a logical subset of the whole.
Denormalization is not a popular technique in the Rails world. What it gives? Complex, often many queries replaced with simple lookup for a single record which contains all the data to be displayed in a pre—formatted manner.
How to deal with concurrency issues
Please, have a look at the read model implementation:
module Reporting
class ParticipantReport < ApplicationRecord
def self.write(scores, participant, test)
ApplicationRecord.transaction do
advisory_lock(participant.id, test.id)
report = find_or_initialize_by(participant_id: participant.id, test_id: test.id)
report.slug = SecureRandom.hex(6)
report.participant_name = participant.name
report.scores = scores
report.save!
end
end
private_class_method def self.advisory_lock(participant_id, test_id)
bigint = [participant_id, test_id].join.hash
ApplicationRecord.connection.execute(
"SELECT pg_advisory_xact_lock(#{bigint})"
)
end
end
end
It’s mostly obvious. One might think that, there’s already with_lock
or simply lock!
method in ActiveRecord
. Yes,
it is. However, it won’t work for the not–yet–existing records because it uses lock for update
and on first write there’s
no update operation, but create. So, in many cases ActiveRecord::RecordNotUnique
errors would appear if two or more
concurrent threads would try to insert the row for the first time. Thanks to pg_advisory_xact_lock ( key bigint ) → void
we can obtain an exclusive transaction-level advisory lock, waiting if necessary. Yet another reason to use PostgreSQL
.
How to use the read model
# app/controllers/test_results_controller.rb
class TestResultsController < ApplicationController
def show
render locals: { report: ParticipantReport.find_by!(report_slug: params[:slug]) }
end
end
\# app/views/test_results/show.html.erb
%h1
Personalised report for #{report.participant_name}
%h2= report.test_name
- report.skills.each do |skill|
%div
Your performance in #{skill[:name]} is
\#{skill[:average]} comparing to
\#{skill[:global]} earned by others
But…
What if another field is required or there was a bug in the calculations? Not a problem, read models can be thrown out and rebuild with ease, because all the history behind them is known — thanks to domain events.
Btw. You might be also interested in other posts on read models on our blog.