Oskar Dudycz

Pragmatic about programming

Event Sourcing on PostgreSQL in Node.js just became possible with Emmett

2024-07-12 oskar dudyczPostgreSql

2024 07 12 cover

Last week, I announced Pongo - Mongo, but it was on PostgreSQL. So, the Node.js library allows using PostgreSQL as a document database.

Today, I have at least an equally big announcement: I released the PostgreSQL event store for Emmett. Boom!

What’s Emmett? It’s an Event Sourcing library. I announced it some time ago and have worked on it continuously for the last few months. It already supports EventStoreDB, and now it has our favourite PostgreSQL storage!

Read more:

How to use it? Pretty simple. Start with installing npm package:

$ npm add @event-driven-io/emmett-postgresql

Then setup event store using connection string to PostgreSQL:

import { getPostgreSQLEventStore } from '@event-driven-io/emmett-postgresql';

const connectionString =
  "postgresql://dbuser:secretpassword@database.server.com:3211/mydb";

const eventStore = getPostgreSQLEventStore(connectionString);

Internally, it uses the node-postgres package with connection pooling. So you don’t need to do much more. Well, maybe besides gracefully closing it on the application closure:

await eventStore.close();

Cool, but what you can do with it? Check Emmett docs. The same you can do with EventStoreDB storage you can do with PostgreSQL!

Or you can actually do more with PostgreSQL, as…

PostgreSQL has inline projections support. What are inline projections? They’re functions updating your read models in the same transaction as appending events. So either all was stored or nothing. Of course, you need to be careful with them as they can slow your appends, but they’re really useful. Async projections will come in future releases.

Ok, but how to use it? Let’s say that you’d like to build a read model with a summary of your shopping cart:

type ShoppingCartShortInfo = {
  productItemsCount: number;
  totalAmount: number;
};

Transformation function could look like that:

const evolve = (
  document: ShoppingCartShortInfo | null,
  { type, data: event }: ProductItemAdded | DiscountApplied,
): ShoppingCartShortInfo => {
  document = document ?? { productItemsCount: 0, totalAmount: 0 };

  switch (type) {
    case 'ProductItemAdded':
      return {
        totalAmount:
          document.totalAmount +
          event.productItem.price * event.productItem.quantity,
        productItemsCount:
          document.productItemsCount + event.productItem.quantity,
      };
    case 'DiscountApplied':
      return {
        ...document,
        totalAmount: (document.totalAmount * (100 - event.percent)) / 100,
      };
  }
};

It’ll be run for each event of type ProductItemAdded and DiscountApplied that’s appended to the event store.

Let’s say that you’d like to use Pongo and store it as a document in PostgreSQL, then you can define projection as:

const shoppingCartShortInfoCollectionName = 'shoppingCartShortInfo';

const shoppingCartShortInfoProjection = pongoSingleStreamProjection({
  collectionName: shoppingCartShortInfoCollectionName,
  evolve,
  canHandle: ['ProductItemAdded', 'DiscountApplied'],
});

and register it through event store options:

import { projection } from '@event-driven-io/emmett';
import { getPostgreSQLEventStore } from '@event-driven-io/emmett-postgresql';

const connectionString =
  "postgresql://dbuser:secretpassword@database.server.com:3211/mydb";

const eventStore = getPostgreSQLEventStore(connectionString, {
  projections: projections.inline([
    shoppingCartShortInfoProjection,
    customProjection,
  ]),
});

We’re saying that we’d like to update the shoppingCartShortInfo collection using the evolve function for the following set of event types.

Internally, it’ll use the Pongo new feature: a handler that loads the existing document and tries to insert, replace or delete it depending on the result obtained from the function.

It look’s as follows:

const collection = pongo.db().collection<Document>(collectionName);

for (const event of events) {
  await collection.handle(getDocumentId(event), async (document) => {
    return await evolve(document, event);
  });
}

If you’re wondering what getDocumentId is, then for pongoSingleStreamProjection, it’ll automatically use the stream name as the document id.

Suppose you’d like to customise it, e.g. to match events from different streams. In that case, you can use the pongoMultiStreamProjection definition, which allows you to specify the document ID matcher for each event. For instance:

const shoppingCartShortInfoCollectionName = 'shoppingCartShortInfo';

const getDocumentId = ({type}:  ProductItemAdded | DiscountApplied): string => {
  switch(type)
  {
    case 'ProductItemAdded': 
      return event.metadata.streamName;
    case 'DiscountApplied': 
      return event.metadata.streamName;
  }
};

const shoppingCartShortInfoProjection = pongoSingleStreamProjection({
  collectionName: shoppingCartShortInfoCollectionName,
  evolve,
  canHandle: ['ProductItemAdded', 'DiscountApplied'],
  getDocumentId
});

You can also do a free-hand projection using pongoProjection that takes the following handler:

(pongo: PongoClient, events: ReadEvent<EventType>[]) => Promise<void>

Cool, isn’t it?

**Read more details in the follow up article Writing and testing event-driven projections with Emmett, Pongo and PostgreSQL

Of course, those are still experimental features; they need to be optimised, tested extensively, etc. But they work, which makes me happy.

As you see, I’m quite thrilled that I could deliver it, as this is a big milestone. This will enable many people to finally do Event Sourcing in Node.js using PostgreSQL and have basic building blocks.

All of that wouldn’t be possible with my recent changes to Pongo.

  1. I managed to close the basic coverage of document manipulation methods. I added initial versions of what was initially missing: replaceOne, drop, rename, countDocuments, count, estimatedDocumentCount, findOneAndDelete, findOneAndReplace, findOneAndUpdate, etc.

Now, a bigger portion that were made as preparations to Emmett PostgreSQL projections you just learned about:

  1. Added option to inject external connection pool and db client to Pongo collection as the first step for transaction handling. Now, you can create transactions outside and inject pool clients. It’s not yet fully the same as Mongo API; it’ll be delivered in follow-up PR.

  2. Strengthened the schema and updated the id from UUID to text. Changed _id type to TEXT. In PostgreSQL, this should be almost equally the same indexable. Of course, it’ll take a bit more storage, but let’s live with that for now. Changing from uuid to text will allow more sophisticated key strategies. Most importantly, it’ll reuse the stream ID as a document ID for Emmett projections.

Also, thanks to the Franck Pachot contribution, we confirmed that Pongo is compatible not only with vanilla Postgres but also databases like Yugabyte!

It was just a week, but I’m extremely happy with how Pongo was taken by the community. I got to HackerNews front page and got over 900 GitHub stars!

Now it’s the time for Emmett! Synergy with Pongo should help with that.

What a week! It’s easy to forget that Pongo was released just 7 days ago!

Now I can go to vacations, next blog will be in August!

Cheers!

Oskar

p.s. Ukraine is still under brutal Russian invasion. A lot of Ukrainian people are hurt, without shelter and need help. You can help in various ways, for instance, directly helping refugees, spreading awareness, putting pressure on your local government or companies. You can also support Ukraine by donating e.g. to Red Cross, Ukraine humanitarian organisation or donate Ambulances for Ukraine.

đź‘‹ If you found this article helpful and want to get notification about the next one, subscribe to Architecture Weekly.

✉️ Join over 6500 subscribers, get the best resources to boost your skills, and stay updated with Software Architecture trends!

Loading...
Event-Driven by Oskar Dudycz
Oskar Dudycz For over 15 years, I have been creating IT systems close to the business. I started my career when StackOverflow didn't exist yet. I am a programmer, technical leader, architect. I like to create well-thought-out systems, tools and frameworks that are used in production and make people's lives easier. I believe Event Sourcing, CQRS, and in general, Event-Driven Architectures are a good foundation by which this can be achieved.