Replaying events in RailsEventStore
… and check why 5600+ Rails engineers read also this
Replaying events in RailsEventStore
Event Sourcing comes with a very handy utility that lets you replay events that happened in the system. Imagine that you’re introducing a new report, or you fixed an existing event handler and you need to run some events against it to produce a valuable outcome that your business friends expect.
If you are not familiar with event-sourcing, it’s a way to store the state of a system as a series of events, not just the current state. Check out our event sourcing tag.
How do I replay events with RailsEventStore?
Let’s assume that we want to send a Xmas card to our customers that made at least 5 orders and haven’t returned any of them during last 3 months. Luckily, in our system, we have events that will help us make decisions about which customers should receive the astonishing Xmas cards.
Those events are OrderFulfilled
and OrderReturned
. We also know exactly when they happened, right? We can easily find & replay events from the last 3 months and trigger the new functionality aka send Xmas cards.
We’ll need an instance of the RailsEventStore
client. Then we need to specify which events you want to replay. The requirements are clear. We’re interested in OrderFulfilled
and OrderReturned
events from the last 3 months. So let’s prepare the list of events that will be replayed using the read API.
For detailed information on setting up the RailsEventStore client, see the doc.
Find all events of type OrderFulfilled
and OrderReturned
that have occurred in the last 3 months.
events = client.read.of_type([OrderFulfilled, OrderReturned]).newer_than(3.months.ago)
The events to do the replay are ready. SendXmasCardToEligibleCustomer
is a class that will determine if the customer is eligible to receive a gift. If they are, there will be a request for the gift to be sent. Lets do a replay of our events.
events.each { |event| SendXmasCardToEligibleCustomer.new.call(event) }
And voila, we have replayed the events for the needs of the SendXmasCardToEligibleCustomer
class.
In this particular case we’re instantiating the SendXmasCardToEligibleCustomer
class and executing its logic based on the event passed to the call method. However, there are other things that you could do. Given your handlers are idempotent, you could simply re-publish those events once again.
One way to implement such example
Lets take a look at a possible implementation of SendXmasCardToEligibleCustomer
.
class SendXmasCardToEligibleCustomer
NUMBER_OF_DAYS_TO_RETURN_ORDER = 14.days.freeze
class State
def initialize
@fulfilled_orders = []
@has_no_returned_orders = true
@already_sent = false
@version = -1
@event_ids_to_link = []
end
def mark_as_sent
@already_sent = true
end
def apply_order_fulfilled(order_id, fulfillment_date)
@fulfilled_orders << { order_id: order_id, fulfillment_date: fulfillment_date }
end
def apply_order_returned
@has_no_returned_orders = false
end
def complete?
@fulfilled_orders.size >= 5 &&
@has_no_returned_orders &&
last_fulfilled_order_return_date_passed?
end
def last_fulfilled_order_return_date_passed?
@fulfilled_orders
.map { |order| order.fetch(:fulfillment_date) }
.all? { |date| date + NUMBER_OF_DAYS_TO_RETURN_ORDER < Time.now }
end
def apply(*events)
events.each do |event|
case event
when OrderFulfilled
apply_order_fulfilled(
event.data.fetch(:order_id),
event.data.fetch(:fulfilled_at)
)
when OrderReturned
apply_order_returned
end
@event_ids_to_link << event.id
end
end
def load(stream_name, event_store:)
events = event_store.read.forward(stream_name)
events.each { |event| apply(event) }
@version = events.size - 1
@event_ids_to_link = []
self
end
def store(stream_name, event_store:)
event_store.link(@event_ids_to_link, stream_name: stream_name, expected_version: @version)
@version += @event_ids_to_link.size
@event_ids_to_link = []
end
end
private_constant :State
def call(event)
customer_id = event.data(:customer_id)
stream_name = "SendXmasCardToEligibleCustomer$#{customer_id}_#{Time.current.year}"
state = State.new
state.load(stream_name, event_store: event_store)
return if state.already_sent? # The gift has to be sent once only.
state.apply(event)
state.store(stream_name, event_store: event_store)
if state.complete?
command_bus.(ScheduleXmasCardShipment.new(data: { customer_id: customer_id }))
state.mark_as_sent # Mark process as finished
end
end
end
Since it responds to two events and needs to calculate the occurrence, it seems like it could be a process manager instead of a simple event handler.
SendXmasCard has to make a decision about sending the gift. To do this, it needs to keep track of fulfilled orders and returned orders for customers. Additionally, it needs to check if the time to return fulfilled orders has passed. Also, the card should be sent only once a year. Therefore, once it is sent, the state is marked as already sent.