Oskar Dudycz

Pragmatycznie o programowaniu

Implementing Closing the Books pattern

2024-02-17 oskar dudyczEvent Sourcing

cover

No one knows who invented Event Sourcing. I overheard that Hammurabi did. Why? Because he standardised the first set of rules of accounting.

Event Sourcing works as bookkeeping; we record new entries for each business operation. Not surprisingly, accounting patterns also work well in event sourcing.

For instance, we can use the “Closing the Books” pattern to model the lifecycle process effectively. Its name comes from the accounting domain. All financial numbers are summarised and verified, and the final report is created at the end of each cycle (e.g., month and year). This data is used as a base for the next period. For accounting calculations, you don’t need to bring forward the entire history; it’s enough to summarise the crucial aspects to be carried forward, i.e. the opening balances, etc. The same pattern can be used for temporal modelling.

I explained this pattern in detail in Keep your streams short! Temporal modelling for fast reads and optimal data retention.

And the talk:

Closing the Books is the essence of Event Sourcing modelling. Thanks to that, we can keep things short and thus run our system efficiently. We’re slicing the lifetime of our process, marking the start and end of the lifecycle using events.

In this article, I assume you have read/watched, or skimmed the above resources. I’ll focus on the practical example using Marten. I know that’s not entirely to say “Go read that, before we talk”, but really: “Go read that”. It took me three months to write the article, and a bit less to prepare the talk. Of course, this article may be enough to grab the concept, but there you find more nuanced considerations.

Still, we’ll diverge from the accounting domain. I started explaining Event Sourcing like many of us: using a Bank Account example. But I stopped. It’s a business domain that most of us believe in knowing how it works, but it’s much different from what we may expect.

How much different? There’s no database transaction between multiple bank accounts while transferring money. A Bank Account is also not an entity or transactional boundary. In accounting, we add lifecycle and work on, e.g. Accounting Month.

Yes, there’s no spoon.

Let’s use a scenario close to a financial domain but not the same, plus not tied to a specific time period: cashiers in stores (also used in the original article above). We could try to model that by keeping all transactions for the particular cash register on the same stream, but if we’re building a system for bigger department stores, then that could quickly escalate. We might end up with a stream containing thousands of events. That quickly could make our processing inefficient and unmanageable.

Yet, if we talked with our domain experts, we could realise this is not how our business works. All payments are registered by specific cashiers, and the cashiers care only about what has happened in their shift. They don’t need to know the entire history of transactions, just the starting amount of cash in the drawer (called float) that was left from the previous shift.

Let’s try to reflect that in the simplified event model:

public abstract record CashierShiftEvent
{
    public record ShiftOpened(
        CashierShiftId CashierShiftId,
        string CashierId,
        decimal Float,
        DateTimeOffset StartedAt
    ): CashierShiftEvent;

    public record TransactionRegistered(
        CashierShiftId CashierShiftId,
        string TransactionId,
        decimal Amount,
        DateTimeOffset RegisteredAt
    ): CashierShiftEvent;

    public record ShiftClosed(
        CashierShiftId CashierShiftId,
        decimal DeclaredTender,
        decimal OverageAmount,
        decimal ShortageAmount,
        decimal FinalFloat,
        DateTimeOffset ClosedAt
    ): CashierShiftEvent;

    private CashierShiftEvent(){}
}

So we have ShiftOpened and ShiftClosed events marking the lifetime of the cashier shift and TransactionRegistered to register the payments. Of course, we would have more types of events possibly happening between shift opening and closing, but let’s keep it simple. Just assume that TransactionRegistered is an example of the events you may have throughout the shift lifecycle.

You could notice CashierShiftId as a class, not a primitive value. It’s an example of the strongly-typed key:

public record CashierShiftId(string CashRegisterId, int ShiftNumber)
{
    public static implicit operator string(CashierShiftId id) => id.ToString();

    public override string ToString() => $"urn:cashier_shift:{CashRegisterId}:{ShiftNumber}";
}

I’m not a big fan of the ceremony brought by strongly typed key wrappers, but it makes perfect sense here. It’s not just a wrapper for a Guid or string value. Our cashier shift id is built from two components:

  • cash register id,
  • shift number.

We’re using the Uniform Resource Name structure (URN).

urn:cashier_shift:{CashRegisterId}:{ShiftNumber}

You can use any format, but URN is the standardised way for handling ids with meaningful segments. Why reinvent the wheel if we have a standard for it? Our URN starts with the prefix and stream type. Then, we have cash register id and shift number segments.

We could add more components, e.g., date, if we’d like to have it sliced by a temporal aspect. So, having shift numbers reset every day. Most importantly, we’re defining the explicit lifetime for our cashier shift.

If you are using EventStoreDB, you’d need to structure it a bit differently: cashier_shift-{cashRegisterId}:{number}. EventStoreDB expects the first part to represent the category, so for our stream type, we need to have a dash followed by the unique stream ID part.

By default, Marten uses Guid as stream key, but it allows to use string. To do that, we need to change that in the configuration:

services.AddMarten(options =>
    {
        options.Events.StreamIdentity = StreamIdentity.AsString;

        // (...)
    });

Read also:

Moving on, our cashier shift can be modelled as follows:

public record CashierShift
{
    public record NonExisting: CashierShift;

    public record Opened(
        CashierShiftId ShiftId,
        decimal Float
    ): CashierShift;

    public record Closed(
        CashierShiftId ShiftId,
        decimal FinalFloat
    ): CashierShift;

    private CashierShift() { }

    public string Id { get; init; } = default!;
}

It’s either non-existent when there are no shifts, open or closed. It’s trimmed to contain only information needed for decision-making (read more in Slim your aggregates with Event Sourcing!). We could build it from events as:

public record CashierShift
{
    // (...)

    public CashierShift Apply(CashierShiftEvent @event) =>
        (this, @event) switch
        {
            (_, ShiftOpened shiftOpened) =>
                new Opened(shiftOpened.CashierShiftId, shiftOpened.Float),

            (Opened state, TransactionRegistered transactionRegistered) =>
                state with { Float = state.Float + transactionRegistered.Amount },

            (Opened state, ShiftClosed shiftClosed) =>
                new Closed(state.ShiftId, shiftClosed.FinalFloat),

            _ => this
        };
}

Ok, let’s add the final building block: cash register setup. We need the cash register to have cashier shifts happening on it:

public record CashRegister(string Id)
{
    public static CashRegister Create(CashRegisterInitialized @event) =>
        new(@event.CashRegisterId);
}

public record CashRegisterInitialized(
    string CashRegisterId,
    DateTimeOffset InitializedAt
);

public record InitializeCashRegister(
    string CashRegisterId,
    DateTimeOffset Now
);

public static class CashRegisterDecider
{
    public static object[] Decide(InitializeCashRegister command) =>
        [new CashRegisterInitialized(command.CashRegisterId, command.Now)];
}

Let’s now define the set of methods we’ll allow in our process. Accordingly, to events it’ll be: OpenShift, RegisterTransaction, CloseShift:

public abstract record CashierShiftCommand
{
    public record OpenShift(
        string CashRegisterId,
        string CashierId,
        DateTimeOffset Now
    ): CashierShiftCommand;

    public record RegisterTransaction(
        CashierShiftId CashierShiftId,
        string TransactionId,
        decimal Amount,
        DateTimeOffset Now
    ): CashierShiftCommand;

    public record CloseShift(
        CashierShiftId CashierShiftId,
        decimal DeclaredTender,
        DateTimeOffset Now
    ): CashierShiftCommand;

    private CashierShiftCommand(){}
}

Our decision-making process will look as follows:

public static class CashierShiftDecider
{
    public static object[] Decide(CashierShiftCommand command, CashierShift state) =>
        (command, state) switch
        {
            (OpenShift open, NonExisting) =>
            [
                new ShiftOpened(
                    new CashierShiftId(open.CashRegisterId, 1),
                    open.CashierId,
                    0,
                    open.Now
                )
            ],

            (OpenShift open, Closed closed) =>
            [

                new ShiftOpened(
                    new CashierShiftId(open.CashRegisterId, closed.ShiftId.ShiftNumber + 1),
                    open.CashierId,
                    closed.FinalFloat,
                    open.Now
                )
            ],

            (OpenShift, Opened) => [],

            (RegisterTransaction registerTransaction, Opened openShift) =>
            [
                new TransactionRegistered(
                    openShift.ShiftId,
                    registerTransaction.TransactionId,
                    registerTransaction.Amount,
                    registerTransaction.Now
                )
            ],

            (CloseShift close, Opened openShift) =>
            [
                new ShiftClosed(
                    openShift.ShiftId,
                    close.DeclaredTender,
                    close.DeclaredTender - openShift.Float,
                    openShift.Float - close.DeclaredTender,
                    openShift.Float,
                    close.Now
                )
            ],
            (CloseShift, Closed) => [],

            _ => throw new InvalidOperationException($"Cannot run {command.GetType().Name} on {state.GetType().Name}")
        };
}

Yes, I like functional programming, and I used new pattern-matching capabilities in C# here. If you’re not into it much yet, I’m passing the state and the command, and depending on its type, I’m running a specific business logic.

  1. Upon OpenShift command, I’m returning ShiftOpened event with the shift number equal to 0 when there was no state or incrementing the last shift number,
  2. If the shift is already opened, I’m not returning any events, making it irrelevant, so I am not making any changes and not throwing any exceptions.
  3. Registering transaction is just straightforward: adding a new event,
  • I’m doing the necessary summaries upon the CloseShift command. I could also do here validation if the declared tender (so cash in drawer) is equal to the current float, but I didn’t want to blur the process,
  1. When trying to close an already closed shift, I’m making it idempotent accordingly as opening an already opened shift.
  2. If there was an invalid or unexpected combination of command and state, I just throw InvalidOperationException. Pattern matching allows me to spare some if statements.

The essential scenario happens here:

(OpenShift open, Closed closed) =>
[

    new ShiftOpened(
        new CashierShiftId(open.CashRegisterId, closed.ShiftId.ShiftNumber + 1),
        open.CashierId,
        closed.FinalFloat,
        open.Now
    )
],

Why is it essential? Let’s have a look at what our stream can look like.

streams

As you can see, the closed shift is a different stream than the one we’ll open. We’re setting that up in:

new CashierShiftId(
    open.CashRegisterId, 
    closed.ShiftId.ShiftNumber + 1
);

Reminder that’s also reflected in our id structure:

urn:cashier_shift:{CashRegisterId}:{ShiftNumber}

Closing and opening shift as one operation

Cool stuff, but how to effectively perform it? Let’s start with composing closing and opening:

using CommandResult = (CashierShiftId StreamId, CashierShiftEvent[] Events);

public record CloseAndOpenCommand(
    CashierShiftId CashierShiftId,
    string CashierId,
    decimal DeclaredTender,
    DateTimeOffset Now
);

public static class CloseAndOpenShift
{
    public static (CommandResult, CommandResult) Handle(CloseAndOpenCommand command, CashierShift currentShift)
    {
        // close current shift
        var (currentShiftId, cashierId, declaredTender, now) = command;
        var closingResult = Decide(new CloseShift(currentShiftId, declaredTender, now), currentShift);

        // get new current shift state by applying the result event(s)
        currentShift = closingResult.Aggregate(currentShift, (current, @event) => current.Apply(@event));

        // open the next shift
        var openResult = Decide(new OpenShift(currentShiftId, cashierId, now), currentShift);

        // double check if it was actually opened
        var opened = openResult.OfType<ShiftOpened>().SingleOrDefault();
        if (opened == null)
            throw new InvalidOperationException("Cannot open new shift!");

        // return both results with respective stream ids
        return ((currentShiftId, closingResult), (opened.CashierShiftId, openResult));
    }
}

We’re running two commands sequentially, returning the results from both operations together with ids. Read also more in How to handle multiple commands in the same transaction.

If we’re using Marten, we can benefit from PostgreSQL transactional capabilities and Marten built-in Unit of Work.


public static class CloseAndOpenShift
{
    public static async Task<CashierShiftId> CloseAndOpenCashierShift(
        this IDocumentSession documentSession,
        CloseAndOpenCommand command,
        int version,
        CancellationToken ct
    )
    {
        var currentShift =
            await documentSession.Events.AggregateStreamAsync<CashierShift>(command.CashierShiftId, token: ct) ??
            new CashierShift.NonExisting();

        var (closingResult, openResult) = Handle(command, currentShift);

        // Append Closing result to the old stream
        if (closingResult.Events.Length > 0)
            documentSession.Events.Append(closingResult.StreamId, version, closingResult.Events.AsEnumerable());

        if (openResult.Events.Length > 0)
            documentSession.Events.StartStream<CashierShift>(openResult.StreamId, openResult.Events.AsEnumerable());

        await documentSession.SaveChangesAsync(ct);

        return openResult.StreamId;
    }
}

Thanks to the predictable structure of the id and calling the StartStream method, we’re ensuring that if the new shift has already been opened, then our operation will be rejected. We won’t have any duplicated shifts.

We can use this code in the endpoint:

app.MapPost("/api/cash-registers/{cashRegisterId}/cashier-shifts/{shiftNumber:int}/close-and-open",
    async (
        IDocumentSession documentSession,
        string cashRegisterId,
        int shiftNumber,
        CloseAndOpenShiftRequest body,
        [FromIfMatchHeader] string eTag,
        CancellationToken ct
    ) =>
    {
        var command = new CloseAndOpenCommand(
            new CashierShiftId(cashRegisterId, shiftNumber),
            body.CashierId,
            body.DeclaredTender,
            Now
        );

        var openedCashierId = await documentSession.CloseAndOpenCashierShift(command, ToExpectedVersion(eTag), ct);

        return Created(
            $"/api/cash-registers/{cashRegisterId}/cashier-shifts/{openedCashierId.ShiftNumber}",
            cashRegisterId
        );
    }
);

As you can see, we greatly benefit from the repeatability and composability of the Event Sourcing and Decider pattern and Marten’s transactional capabilities.

We’re also using optimistic concurrency with ETag to ensure we won’t face concurrency issues. Thanks to that, we will know we’re making decisions based on the expected state.

Closing and opening shifts as separate operations

The pattern of closing and opening as one operation can be suitable if our lifecycles are continuous. Yet, typically, they’re not. Occupancy of the cash registers by cashiers depends on the traffic. There may be periods when no one works on the particular cash register. How to handle that?

Let’s tackle that step by step, starting from the end, so from the closing. It’ll be a simple operation now:

app.MapPost("/api/cash-registers/{cashRegisterId}/cashier-shifts/{shiftNumber:int}/close",
    (
        IDocumentSession documentSession,
        string cashRegisterId,
        int shiftNumber,
        CloseShiftRequest body,
        [FromIfMatchHeader] string eTag,
        CancellationToken ct
    ) =>
    {
        var cashierShiftId = new CashierShiftId(cashRegisterId, shiftNumber);

        return documentSession.GetAndUpdate<CashierShift, CashierShiftEvent>(cashierShiftId, ToExpectedVersion(eTag),
            state => Decide(new CloseShift(cashierShiftId, body.DeclaredTender, Now), state), ct);
    }
);

I’m using here a simple wrapper that will load events from the stream, build the state from it, run business logic and append event result events if there are such:


public static class DocumentSessionExtensions
{
    public static Task GetAndUpdate<T, TEvent>(
        this IDocumentSession documentSession,
        string id,
        int version,
        Func<T, TEvent[]> handle,
        CancellationToken ct
    ) where T : class =>
        documentSession.Events.WriteToAggregate<T>(id, version, stream =>
        {
            var result = handle(stream.Aggregate);
            if (result.Length != 0)
                stream.AppendMany(result.Cast<object>().ToArray());
        }, ct);
}

Opening will be more complicated. We need to get the shift number and data from the last closed one (like float, the state of the cash in the drawer after the last shift). Of course, the cashier could provide the previous shift number, but that’s error-prone and potential vulnerability. It’d be better if we track that on our own. And the best way for that is to build a model that would cache the information. We could define an updated projection based on the registered events.

What should such a model contain? Potentially it could cache all needed information to open new shift, e.g.:

public record CashierShiftTracker(
    string Id, // Cash Register Id
    string LastClosedShiftNumber,
    string LastClosedShiftFloat,
    // (...) etc.
);

That looks fine, as we could start by querying it and getting the needed information from it. Yet, in my opinion, that solution is not scalable and can provide some issues while maintaining. Our process may change, and we’ll need to update this projection each time that impacts closing or opening. That can create issues with versioning this model, updating it as it goes etc.

I think it’d be better to generalise it and make it agnostic to the specifics of our process. Closing the Books pattern is repeatable:

  • Store summary of the closed streams needed for audit and for the next lifecycle period,
  • When opening, get data from the last event in the closed lifecycle and use it to start a new period and stream.

I wrote in Let’s talk about positions in event stores that Marten keeps the global sequence number. It’s a monotonic number. There may be gaps when an event is not added for some reason (e.g. a conflict or transient error). It’s unique for each event. We could use it as a reference to the closing event.

public record CashierShiftTracker(
    string Id, // Cash Register Id
    long? LastShiftClosedSequence
);

Now, let’s define the multi-stream projection that will start tracking progress when the cash register is initialised and update it with each cashier shift closing event. We’ll get it from the event metadata (Marten’s IEvent wrapper).

public class CashierShiftTrackerProjection: MultiStreamProjection<CashierShiftTracker, string>
{
    public CashierShiftTrackerProjection()
    {
        Identity<CashRegisterInitialized>(e => e.CashRegisterId);
        Identity<CashierShiftEvent.ShiftClosed>(e => e.CashierShiftId.CashRegisterId);
    }

    public CashierShiftTracker Create(CashRegisterInitialized initialized) =>
        new(initialized.CashRegisterId, null);

    public CashierShiftTracker Apply(IEvent<CashierShiftEvent.ShiftClosed> closed, CashierShiftTracker current) =>
        current with { LastShiftClosedSequence = closed.Sequence };
}

We need to register it in the Marten configuration.

services.AddMarten(options =>
    {
        options.Projections.Add<CashierShiftTrackerProjection>(ProjectionLifecycle.Async);

        // (...)
    });

The safest option for multi-stream projection is to register it as asynchronous. It will ensure that all events are run sequentially. Yet, it will introduce eventual consistency as it’ll be processed in the background. Getting the stale data is not dangerous. We’re using optimistic concurrency and have the uniqueness enforced by the predictable stream id structure. In the worst case, we may need to retry our handling.

The other option is to register it as inline. Then, the projection will be updated in the same transaction as the appended event. This will also work for our case because there cannot be parallel shift closings for the particular cash register. Still, as a general rule, we should try to avoid it, as optimistic concurrency will only ensure concurrency on a specific single stream. We have multiple streams, so they could override each other when we have a high load.

Pick your poison, depending on your use case.

Let’s discuss using this model to locate the last cashier shift. We’ll define a dedicated method for that to encapsulate the locator logic:

public static class LastCashierShiftLocator
{
    public static async Task<CashierShift> GetLastCashierShift(
        this IDocumentSession documentSession,
        string cashRegisterId
    )
    {
        var tracker = await documentSession.LoadAsync<CashierShiftTracker>(cashRegisterId);

        if (tracker is null)
            throw new InvalidOperationException("Unknown cash register!");

        var lastClosedShiftEvent = tracker.LastShiftClosedSequence.HasValue
            ? (await documentSession.Events.QueryAllRawEvents()
                .SingleAsync(e => e.Sequence == tracker.LastShiftClosedSequence.Value)).Data
            : null;

        return lastClosedShiftEvent is ShiftClosed closed
            ? new CashierShift.Closed(closed.CashierShiftId, closed.FinalFloat)
            : new CashierShift.NonExisting();
    }
}

As explained above, we’re getting the tracking information, loading the last event (if there’s such) and returning the ClosedShift or NonExisting if no shift was closed yet.

Such tracking logic is repeatable and could even be generalised to other scenarios if needed.

Having that, we can define the endpoint for opening the cashier shift as follows:

app.MapPost("/api/cash-registers/{cashRegisterId}/cashier-shifts",
    async (
        IDocumentSession documentSession,
        string cashRegisterId,
        OpenShiftRequest body,
        CancellationToken ct
    ) =>
    {
        var lastClosedShift = await documentSession.GetLastCashierShift(cashRegisterId);
        var result = Decide(new OpenShift(cashRegisterId, body.CashierId, Now), lastClosedShift);

        var opened = result.OfType<ShiftOpened>().SingleOrDefault();

        if (opened == null)
            throw new InvalidOperationException("Cannot Open Shift");

        await documentSession.Add<CashierShift, CashierShiftEvent>(opened.CashierShiftId, result, ct);

        return Created(
            $"/api/cash-registers/{cashRegisterId}/cashier-shifts/{opened.CashierShiftId.ShiftNumber}",
            cashRegisterId
        );
    }
);

We’re using a simple wrapper for appending new events similar to GetAndUpdate shown before.

public static class DocumentSessionExtensions
{
    public static Task Add<T, TEvent>(this IDocumentSession documentSession, string id, TEvent[] events,
        CancellationToken ct)
        where T : class
    {
        if (events.Length == 0)
            return Task.CompletedTask;

        documentSession.Events.StartStream<T>(id, result.Cast<object>().ToArray());
        return documentSession.SaveChangesAsync(token: ct);
    }
}

TLDR

Keeping streams short is the most important modelling practice in Event Sourcing. Closing the Books pattern is the biggest enabler for that. I hope that after this article, you’ll know how to implement that effectively using Marten.

There are other options to do it, that I described in Keep your streams short! Temporal modelling for fast reads and optimal data retention like:

  • opening periods asynchronously,
  • There may be scenarios where you have multiple open shifts (e.g. cash register in the restaurant used by multiple waiters),
  • you might not be able to get (or even want to have) predictable ids.

Those scenarios will require different ways to handle that, but I’m sure the techniques described above can be enough for you to implement that.

See the full code in my sample repository.

Most importantly: talk with the business experts and ask enough whys to understand the lifecycle of your business use case. Listen for the keywords like open/close/end, summary, daily, monthly, etc. For business experts, lifecycle may be so apparent that they won’t mention it straight away, but if you dig and ask enough questions, they’re typically more than happy to reveal it. Workshops like Event Storming can help you with that.

Check also the follow up article for more nuanced considerations on Should you always keep streams short in Event Sourcing?.

Cheers!

Oskar

p.s. Ukraine is still under brutal Russian invasion. A lot of Ukrainian people are hurt, without shelter and need help. You can help in various ways, for instance, directly helping refugees, spreading awareness, putting pressure on your local government or companies. You can also support Ukraine by donating e.g. to Red Cross, Ukraine humanitarian organisation or donate Ambulances for Ukraine.

👋 If you found this article helpful and want to get notification about the next one, subscribe to Architecture Weekly.

✉️ Join over 6500 subscribers, get the best resources to boost your skills, and stay updated with Software Architecture trends!

Loading...
Event-Driven by Oskar Dudycz
Oskar Dudycz For over 15 years, I have been creating IT systems close to the business. I started my career when StackOverflow didn't exist yet. I am a programmer, technical leader, architect. I like to create well-thought-out systems, tools and frameworks that are used in production and make people's lives easier. I believe Event Sourcing, CQRS, and in general, Event-Driven Architectures are a good foundation by which this can be achieved.