Skip to content

likeinlife/cqrs_mediator

Repository files navigation

Meator

image Mypy checked codecov image Ruff image

Python CQRS pattern implementation.

Docs

Installation

pip install meator

Available

  • Dispatchers:
    • Command
    • Query
  • Observers:
    • Event
  • Entities:
    • Command
    • Event
    • Query
  • Middlewares

Usecases

Command/Event/Query

from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl()

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))

Middleware

from dataclasses import dataclass

from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware


class SimpleMiddleware(Middleware):
    async def __call__(self, call_next: Handler, request: Request):
        return await call_next(request)


@dataclass
class IntCommand(Command[int]):
    answer: int


class IntCommandHandler(CommandHandler[IntCommand, int]):
    async def __call__(self, request: IntCommand) -> int:
        return request.answer


async def main():
    c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])

    c.register(IntCommand, IntCommandHandler())

    await c.handle(IntCommand(1))

Tests

  • pytest tests

Additional

Inspired by didator