Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQLite event store the ability to create connection #178

Prev Previous commit
Fixed failing SQLite e2e tests and TS config
E2E tests were failing, as there was no nested database folder in the 'testing' subdirectory. Probably locally it was created, but git is not commiting empty folders. Adjusted not to require such nesting.

Fixed also TS config to compile SQLite package correctly, as some entries were missing. After that fixed the common errors.

Added a helper withConnection to streamline the connection management. Now it also uses correctly close in finalize, instead of double closing in case of error.

Renamed location to fileName to follow the SQLite naming convention, made it also optional with fallback to in memory.

Removed absolute file path and custom, as it won't allow easily passing the filenames without casting that are not typed manually.
  • Loading branch information
oskardudycz committed Feb 11, 2025
commit 913f5e6e973a5bd2682a3f29d7c0430487911f5c
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -126,4 +126,6 @@ lib
*/.output
e2e/esmCompatibility/.output
src/e2e/esmCompatibility/.output
**/0x
**/0x

**/*.db
256 changes: 129 additions & 127 deletions src/packages/emmett-sqlite/src/eventStore/SQLiteEventStore.e2e.spec.ts
Original file line number Diff line number Diff line change
@@ -6,133 +6,167 @@ import {
ExpectedVersionConflictError,
} from '@event-driven-io/emmett';
import fs from 'fs';
import { afterEach, describe, it } from 'node:test';
import { dirname } from 'path';
import { afterEach, beforeEach, describe, it } from 'node:test';
import path from 'path';
import { fileURLToPath } from 'url';
import { v4 as uuid } from 'uuid';
import { sqliteConnection, type AbsolutePath } from '../sqliteConnection';
import { InMemorySQLiteDatabase, sqliteConnection } from '../sqliteConnection';
import {
type DiscountApplied,
type PricedProductItem,
type ProductItemAdded,
type ShoppingCartEvent,
} from '../testing/shoppingCart.domain';
import { createEventStoreSchema } from './schema';
import { getSQLiteEventStore } from './SQLiteEventStore';

const __dirname = dirname(fileURLToPath(import.meta.url)) as AbsolutePath;
import {
getSQLiteEventStore,
type SQLiteEventStoreOptions,
} from './SQLiteEventStore';

void describe('SQLiteEventStore', () => {
const testDatabasePath: AbsolutePath = __dirname + '/../testing/database/';
const testDatabasePath = path.resolve(
path.dirname(fileURLToPath(import.meta.url)),
'..',
'testing',
);
const fileName = path.resolve(testDatabasePath, 'test.db');

afterEach(() => {
if (!fs.existsSync(`${testDatabasePath}/test.db`)) {
if (!fs.existsSync(fileName)) {
return;
}
fs.unlink(`${testDatabasePath}/test.db`, (err) => {
if (err) console.error('Error deleting file:', err);
});
fs.unlinkSync(fileName);
});

void it('should append events', async () => {
await createEventStoreSchema(
sqliteConnection({ location: `/${testDatabasePath}/test.db` }),
);
const eventStore = getSQLiteEventStore({
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
void describe('With manual Schema Creation', () => {
const config: SQLiteEventStoreOptions = {
schema: {
autoMigration: 'None',
},
fileName,
};
const discount = 10;
const shoppingCartId = `shopping_cart-${uuid()}`;

const result = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
);

const result2 = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
{ expectedStreamVersion: result.nextExpectedStreamVersion },
);

await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'DiscountApplied',
data: { percent: discount, couponId: uuid() },
},
],
{ expectedStreamVersion: result2.nextExpectedStreamVersion },
);
beforeEach(() => createEventStoreSchema(sqliteConnection({ fileName })));

const { events } = await eventStore.readStream(shoppingCartId);
void it('should append events', async () => {
const eventStore = getSQLiteEventStore(config);

assertIsNotNull(events);
assertEqual(3, events.length);
});
const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};
const discount = 10;
const shoppingCartId = `shopping_cart-${uuid()}`;

void it('should aggregate stream', async () => {
await createEventStoreSchema(
sqliteConnection({ location: `${testDatabasePath}/test.db` }),
);
const eventStore = getSQLiteEventStore({
databaseLocation: `${testDatabasePath}/test.db`,
const result = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
);

const result2 = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
{ expectedStreamVersion: result.nextExpectedStreamVersion },
);

await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'DiscountApplied',
data: { percent: discount, couponId: uuid() },
},
],
{ expectedStreamVersion: result2.nextExpectedStreamVersion },
);

const { events } = await eventStore.readStream(shoppingCartId);

assertIsNotNull(events);
assertEqual(3, events.length);
});

const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};
const discount = 10;
const shoppingCartId = `shopping_cart-${uuid()}`;
void it('should aggregate stream', async () => {
const eventStore = getSQLiteEventStore(config);

const result = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
);

const result2 = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
{ expectedStreamVersion: result.nextExpectedStreamVersion },
);

await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'DiscountApplied',
data: { percent: discount, couponId: uuid() },
},
],
{ expectedStreamVersion: result2.nextExpectedStreamVersion },
);
const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};
const discount = 10;
const shoppingCartId = `shopping_cart-${uuid()}`;

const result = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
);

const aggregation = await eventStore.aggregateStream(shoppingCartId, {
evolve,
initialState: () => null,
const result2 = await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[{ type: 'ProductItemAdded', data: { productItem } }],
{ expectedStreamVersion: result.nextExpectedStreamVersion },
);

await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'DiscountApplied',
data: { percent: discount, couponId: uuid() },
},
],
{ expectedStreamVersion: result2.nextExpectedStreamVersion },
);

const aggregation = await eventStore.aggregateStream(shoppingCartId, {
evolve,
initialState: () => null,
});

assertDeepEqual(
{ totalAmount: 54, productItemsCount: 20 },
aggregation.state,
);
});

assertDeepEqual(
{ totalAmount: 54, productItemsCount: 20 },
aggregation.state,
);
void it('should throw an error if concurrency check has failed when appending stream', async () => {
const eventStore = getSQLiteEventStore(config);

const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};

const shoppingCartId = `shopping_cart-${uuid()}`;

await assertThrowsAsync<ExpectedVersionConflictError<bigint>>(
async () => {
await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'ProductItemAdded',
data: { productItem },
},
],
{
expectedStreamVersion: 5n,
},
);
},
);
});
});

void it('should automatically create schema', async () => {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
fileName,
});

const productItem: PricedProductItem = {
@@ -158,7 +192,7 @@ void describe('SQLiteEventStore', () => {
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: ':memory:',
fileName: InMemorySQLiteDatabase,
});
const productItem: PricedProductItem = {
productId: '123',
@@ -183,7 +217,7 @@ void describe('SQLiteEventStore', () => {
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
fileName,
});

const productItem: PricedProductItem = {
@@ -206,46 +240,14 @@ void describe('SQLiteEventStore', () => {
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
fileName,
});

const stream = await sameEventStore.readStream(shoppingCartId);

assertIsNotNull(stream.events);
assertEqual(1, stream.events.length);
});

void it('should throw an error if concurrency check has failed when appending stream', async () => {
const eventStore = getSQLiteEventStore({
schema: {
autoMigration: 'CreateOrUpdate',
},
databaseLocation: `${testDatabasePath}/test.db`,
});

const productItem: PricedProductItem = {
productId: '123',
quantity: 10,
price: 3,
};

const shoppingCartId = `shopping_cart-${uuid()}`;

await assertThrowsAsync<ExpectedVersionConflictError<bigint>>(async () => {
await eventStore.appendToStream<ShoppingCartEvent>(
shoppingCartId,
[
{
type: 'ProductItemAdded',
data: { productItem },
},
],
{
expectedStreamVersion: 5n,
},
);
});
});
});

type ShoppingCartShortInfo = {
Loading