Catchup subscriptions with Rails Event Store
… and check why 5600+ Rails engineers read also this
Catchup subscriptions with Rails Event Store
The usual way of handling domain events in Rails Event Store is using the included pub-sub mechanism. And in most cases, especially when you have majestic monolith application it is enough. It is also easiest to grasp by the team and use in legacy applications implemented using Rails Way. But sometimes it is not enough, sometimes this way of handling events become an obstacle.
Let me define the domain problem. The feature we want to implement is described by requirements:
- every time a blog post is published its metadata (slug, title, time of creation, author, tags) should be sent to external system to index our articles
- when blogpost is updated the metadata in external index must be also updated
At the beginning it looks very simple. And a first, naive, implementation of this requirements could look like:
Let’s start with new Rails application with RailsEventStore template:
rails new -m https://railseventstore.org/new blog.yourdomain.com
Then let’s create a modules (bounded contexts) because we don’t want to end up in Big Ball Of Mud quickly:
rails generate bounded_context:bounded_context blogging
rails generate bounded_context:bounded_context index
Next, we need to define domain events, implement the blogging logic (omitted here because that’s different topic for another post):
# ./blogging/lib/blogging.rb
module Blogging
class ArticlePublished < RailsEventStore::Event
# ... the schema of events should be defined here
end
class ArticleChanged < RailsEventStore::Event
# ... and here ;)
end
end
Define subscriptions, to connect our domain events to handlers:
# ./config/initializers/rails_event_store.rb
Rails.configuration.to_prepare do
Rails.configuration.event_store = RailsEventStore::Client.new(
dispatcher: RubyEventStore::ComposedDispatcher.new(
RailsEventStore::AfterCommitAsyncDispatcher.new(
scheduler: RailsEventStore::ActiveJobScheduler.new),
RubyEventStore::Dispatcher.new
)
)
Rails.configuration.command_bus = Arkency::CommandBus.new
Rails.configuration.event_store.tap do |store|
store.subscribe(Index::PushArticleToIndex,
to: [
Blogging::ArticlePublished,
Blogging::ArticleChanged,
]
)
end
end
And finally our handler to send articles to external indexing service:
# ./index/lib/index/publish_article_to_index.rb
module Index
class PushArticleToIndex < ActiveJob::Base
prepend RailsEventStore::AsyncHandler
def initialize(index_adapter = Rails.configuration.index_adapter)
@index = index_adpater
end
def perform(event)
@index.push(build_index_data(event))
end
private
def build_index_data(event)
# ... does not matter here, some hash probably ;)
end
end
end
A few things to notice
1st: embrace async!
The Index::PushArticleToIndex
is an asynchronous event handler. It is inherited from ActiveJob::Base
and implements perform(event)
method. This will allow to use it by RailsEventStore::ActiveJobScheduler
and schedule sending to an index asynchronously, without blocking the execution of main logic. Because we do not want to fail publishing our new article just because indexing service is down :)
2nd: beware of transaction rollbacks!
Some background jobs adapters (i.e. Sidekiq) use Redis to store information about scheduled jobs. That’s why we should change default dispatcher in Rails Event Store client to RailsEventStore::AfterCommitAsyncDispatcher
. It ensures that the async handlers will be scheduled only after commit of current database transaction. Your handlers won’t be triggered when transaction is rolled back.
3rd: subscribing to async handers is different
Rails Event Store uses call(event)
method to invoke an event handler’s logic. By default you need to pass a callable instance of handler or lambda to subscribe
method. But this is not the same when using RailsEventStore::ActiveJobScheduler
. If you want to have your handler processed by this scheduler it must be a class and must inherit from ActiveJob::Base
. Otherwise (thanks to the RubyEventStore::ComposedDispatcher
) it will be handed by default RubyEventStore::Dispatcher
.
Where is the drawback?
This solution has a drawback. Let’s imagine that your blogging platform becomes extremely popular and you need to handle hundreds of blogposts per second. Thanks to async processing you might even be able to cope with that. But then your index provider announced it has ended his “amazing journey” and you need to move your index to a new one. Do I have to mention that your paying customers expect the platform will work 24/7 ? ;)
Making things differently
Catchup subscription is a subscription where client defines a starting point and a size of dataset to handle. Basically it’s a while loop reading through a stream of events with defined chunk size. It could be a separate process, maybe even on separate node which we will spin up only to handle incoming events and push the articles to the external index provider.
First things first
The catchup subscription is easy to implement when you read from a single stream. But your Blogging
context should not put all events into single stream. That’s obvious.
In this case we could use linking to stream feature in Rails Event Store.
Change the Rails Event Store initializer:
# ./config/initializers/rails_event_store.rb
Rails.configuration.to_prepare do
Rails.configuration.event_store = RailsEventStore::Client.new(
dispatcher: RubyEventStore::ComposedDispatcher.new(
RailsEventStore::AfterCommitAsyncDispatcher.new(
scheduler: RailsEventStore::ActiveJobScheduler.new),
RubyEventStore::Dispatcher.new
)
)
Rails.configuration.command_bus = Arkency::CommandBus.new
Rails.configuration.event_store.tap do |store|
store.subscribe(Index::LinkToIndex.new(store),
to: [
Blogging::ArticlePublished,
Blogging::ArticleChanged,
]
)
end
end
And instead of PublishArticleToIndex
use LinkToIndex
:
# ./index/lib/index/link_to_index.rb
module Index
class LinkToIndex
def initialize(event_store)
@event_store = event_store
end
def call(event)
@event_store.link(event.event_id, "indexable-articles")
end
end
end
This time we will use a simple synchronous handler. Thanks to RubyEventStore::ComposedDispatcher
we do not need to change anything. It won’t match the handlers handled by RailsEventStore::ActiveJobScheduler
so the default RubyEventStore::Dispatcher
will trigger the event handler.
Implementing catchup subscription
When started, the catchup subscription should start reading from the last processed event and handle read events (pushing them to the external index).
Here a sample implementation:
# ./index/lib/index/push_articles_to_index.rb
module Index
class PushArticlesToIndex
def initialize(event_store, index_adapter, chunk_size = 1000)
@event_store = event_store
@index = index_adpater
@chunk_size = chunk_size
end
def call(starting_point = determine_starting_point)
while(!(events = fetch_from(starting_point)).empty?)
events.each do |event|
@index.push(build_index_data(event))
store_processed(event.event_id)
end
end
end
private
def fetch_from(event_id)
scope = @event_store.read
.stream('indexable-articles')
scope = scope.from(event_id) if event_id
scope.limit(@chunk_size)
end
def determine_starting_point
# ... read last processed event id
end
def store_processed(event_id)
# ... store processed event id
end
def build_index_data(event)
# ... does not matter here, some hash probably ;)
end
end
end
Why a separate process?
Let’s start with cons of this solution. The obvious one is you need a separate process :) Maybe with separate deployment, separate CI etc. Probably more time will pass between publication of article and indexing it. So why bother?
Rebuilding an index
The external index is basically a read model of your articles metadata. Tailor made, aligned with capabilities of external index provider. Recreateable.
This is what makes a difference here. To rebuild an index from scratch all you need to do is to remove the stored starting point for the catchup subscription and wait some time. The indexing will start from beginning and will go though all published articles until index will be up to date.
But there is more!
I’ve mentioned a scenario where you change your external index provider. Using a catchup subscription it will be quite easy. Just create new instance of the subscription process with different index adapter. Run it and wait until it catches up and indexes all the published articles. And then just switch your application to new external index and drop the old subscription.