Rename stream in Rails Event Store with zero downtime
… and check why 5600+ Rails engineers read also this
Rename stream in Rails Event Store with zero downtime
A question has been posted on our Arkency’s slack today:
How to rename stream in event store? Is
link
&delete_stream
enough to do it? Asking for a friend ;)
At first it looks quite easy. Sure, we can! Just link add events from source stream
to target stream, then change publisher to write to new stream and use delete_stream
method of Rails Event Store to remove old stream.
Ahhh, and of course we must handle incoming domain events, with zero downtime.
And now where the fun begins :)
Naive approach
The concept is still the same:
- link all events from source stream to target stream
- switch publisher to write to target stream
- delete source stream
But having publisher constantly writing to source stream creates a few problems to handle.
Source stream could not be just switched to target stream when all source stream’s events are linked to target stream. There could be a race condition and after we link last event and switch publisher to target stream new domain events could be published in source stream. This of course will be bad as we could lose some domain events.
Catchup subscription FTW
So let’s use catchup subscription to … you know… catch up with source stream and only then switch to the target stream.
This time concept is:
- publisher is constantly publishing new domain events to a stream, which is defined in some data store,
- catchup subscription reads a chunk of events from source stream (never read too much,
some streams might have a lot of events), and then:
- if there are more events in source stream, link them to target stream and fetch next chunk of events
- if the subscription is caught up with source stream (we have both streams with the same events) then do the switch of stream in publisher’s data store. Since now publisher will be writing to the target stream
- delete source stream
Race conditions again
But there is a catch. Race condition. Or I should say race conditions. There are moments in code execution where we still miss some domain events.
- after we catchup with source stream and before we make the switch new events might be published in source stream,
- publisher might fetch current stream from data store and then catchup process can make the switch, this time publisher will still write to source stream, and we will miss some events again
Get yourself a lock!
We need to have a lock on 2 critical operations here. First while fetching source stream events and making a switch of target stream on catchup process. And second while publisher fetches current stream from data store and writes new events to it.
Code spike to demonstrate this concept
I’ve spent some time today to experiment how to implement this with the Rails Event Store. Or actually with Ruby Event Store. I do not need Rails for that, just pure Ruby.
Let’s setup some basic objects
require 'ruby_event_store'
event_store = RubyEventStore::Client.new(
repository: RubyEventStore::InMemoryRepository.new,
mapper: RubyEventStore::Mappers::NullMapper.new
)
FooEvent = Class.new(RubyEventStore::Event)
This code will create a new instance of RubyEventStore::Client
using in memory repository
and NullMapper
just to skip some friction. Then it defines a sample domain event class.
So now our publisher (simulated):
def publish(stream)
index = 0
while(true) do
stream.publish(FooEvent.new(data: {index: index}))
puts "#{index} published to stream: #{stream}"
sleep(Random.rand(1.1) / TIME_UNIT)
index += 1
end
end
Is just a method that constantly publishes a new domain event (with index) to some stream.
The stream
here will be the most interesting part here.
Then the catchup process:
def catchup(stream)
processed = nil
while(true)
events = stream.catchup(processed)
break if events.empty?
events.each do |event|
stream.link(event)
puts "#{event.data[:index]} linked to stream: target"
end
processed = events.last.event_id
sleep(Random.rand(1.0) / TIME_UNIT)
end
end
As described it fetches some events from source stream and links them to target stream. It stops when there is nothing more to read from source stream.
Please notice the difference in:
# publish
sleep(Random.rand(1.1) / TIME_UNIT)
# catchup
sleep(Random.rand(1.0) / TIME_UNIT)
To give a catchup process a chance to finally catchup with source stream it must process
events a little faster than they are published by publisher.
A TIME_UNIT
is just a constant to define how fast you want this experiment to process events.
WAIT_TIME = 0.5
publish = Thread.new {publish(stream)}
sleep(WAIT_TIME)
puts "Starting catchup thread"
catchup = Thread.new {catchup(stream)}
catchup.join
puts "Catchup thread done"
sleep(WAIT_TIME)
publish.exit
puts "Publish thread done"
puts "Source stream:"
puts (source_events = event_store.read.stream("source").map{|x| x.data[:index]}).inspect
puts "Target stream:"
puts (target_events = event_store.read.stream("target").map{|x| x.data[:index]}).inspect
raise "FAIL" unless target_events[0, source_events.size] == source_events
puts "DONE - now remove source stream"
To check if the me experimental code works I start 2 threads. First will be the publisher
(executing the publish
method). Second will be the catchup process (executing the
catchup
method). Before catchup process starts it will wait for some time - just
to let publisher write some events to source stream. Then after catchup thread is finished I will
wait again some time to let publisher publish a few more events - this time to target stream.
And finally some assertion to check if target stream starts with all events from source stream.
The StreamSwitcher
class StreamSwitch
def initialize(event_store, source, target, lock: Mutex.new)
@event_store = event_store
@current = source
@source = source
@target = target
@lock = lock
end
def publish(event)
@lock.synchronize do
@event_store.publish(event, stream_name: @current)
end
end
def catchup(processed)
@lock.synchronize do
scope = @event_store.read.stream(@source).limit(5)
scope = scope.from(processed) if processed
events = scope.to_a
change if events.empty?
events
end
end
def link(event)
@event_store.link(event.event_id, stream_name: @target)
end
def to_s
@current
end
private
def change
@current = @target
end
end
Here is the logic. The publish
, catchup
& link
methods are called by
publisher & catchup threads. The publish
will always write domain event to
current stream. Notice that it uses the synchronized block to avoid race conditions.
The same lock is used by catchup
method to avoid race condition where we read source stream
and - if there is no more events to link - do the change of current stream.
Real life problems
I’ve used a Mutex
class here to synchronize critical operations - but this is only experimental code,
not production ready. In real life scenario the lock should depend on what kind of EventRepository
you are using in your system. If you store your domain events in SQL database consider named locks to implement
a synchronization.
There is still a race condition that some events may be skipped or added out of order when there are the events currently published in other transactions, which has not yet finished when catchup process has went through them.
The result
Here sample execution for TIME_UNIT = 10.0
:
~/arkency$ ruby rename-stream.rb
0 published to stream: source
1 published to stream: source
2 published to stream: source
3 published to stream: source
4 published to stream: source
5 published to stream: source
6 published to stream: source
Starting catchup thread
0 linked to stream: target
1 linked to stream: target
2 linked to stream: target
3 linked to stream: target
4 linked to stream: target
5 linked to stream: target
6 linked to stream: target
Catchup thread done
7 published to stream: target
8 published to stream: target
9 published to stream: target
10 published to stream: target
11 published to stream: target
12 published to stream: target
13 published to stream: target
14 published to stream: target
15 published to stream: target
Publish thread done
Source stream:
[0, 1, 2, 3, 4, 5, 6]
Target stream:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
DONE - now remove source stream
Looks very simple. But play a bit with it. It looks much more interesting when TIME_UNIT = 100000.0
.
Now you could finally remove the source stream:
event_store.delete_stream("source")
BTW Neither link
nor delete_stream
does not affect any domain event in any way.
Stream is just a grouping mechanism for domain events. Once you write domain event to event store
it could not be deleted (at least not without use of rails console :P).
Make your own experiments
Code is fun! Go play with it! Here is the source of code spike for this blog post.