From fa8287dec730d09d95fd7f6dce7f8872aa9fb3bb Mon Sep 17 00:00:00 2001 From: Ivan Zakrevsky Date: Fri, 24 May 2024 21:30:51 +0300 Subject: [PATCH] WIP ArtifactRepository.Get() --- .../artifact/artifact_repository.go | 29 +++++++++++++++++-- .../artifact/queries/artifact_get_query.go | 22 -------------- .../seedwork/repository/event_get_query.go | 11 +++++-- .../seedwork/repository/event_store.go | 7 +++++ 4 files changed, 42 insertions(+), 27 deletions(-) delete mode 100644 grade/internal/infrastructure/repositories/artifact/queries/artifact_get_query.go diff --git a/grade/internal/infrastructure/repositories/artifact/artifact_repository.go b/grade/internal/infrastructure/repositories/artifact/artifact_repository.go index 668e45f1..29266f65 100644 --- a/grade/internal/infrastructure/repositories/artifact/artifact_repository.go +++ b/grade/internal/infrastructure/repositories/artifact/artifact_repository.go @@ -14,7 +14,7 @@ import ( func NewArtifactRepository(currentSession session.DbSession) *ArtifactRepository { return &ArtifactRepository{ session: currentSession, - eventStore: repository.NewEventStore(currentSession, "Artifact", eventQuery), + eventStore: repository.NewEventStore(currentSession, "Artifact", eventToQuery), } } @@ -32,10 +32,35 @@ func (r *ArtifactRepository) NextId(tenantId tenantVal.TenantId) (artifactVal.Ar return q.Get(r.session) } -func eventQuery(iEvent aggregate.PersistentDomainEvent) (q session.EventSourcedQueryEvaluator) { +func (r *ArtifactRepository) Get(id artifactVal.ArtifactId) (*artifact.Artifact, error) { + idExporter := &artifactVal.ArtifactIdExporter{} + id.Export(idExporter) + streamId, err := r.eventStore.NewStreamId(int(idExporter.TenantId), idExporter.ArtifactId.String()) + if err != nil { + return nil, err + } + q := repository.EventGetQuery{ + StreamId: streamId, + EventReconstitutor: rowsToEvent, + } + stream, err := q.Stream(r.session) + if err != nil { + return nil, err + } + rec := &artifact.ArtifactReconstitutor{ + PastEvents: stream, + } + return rec.Reconstitute() +} + +func eventToQuery(iEvent aggregate.PersistentDomainEvent) (q session.EventSourcedQueryEvaluator) { switch event := iEvent.(type) { case *events.ArtifactProposed: q = queries.NewArtifactProposedQuery(event) } return q } + +func rowsToEvent(*session.Rows) (aggregate.PersistentDomainEvent, error) { + return nil, nil +} diff --git a/grade/internal/infrastructure/repositories/artifact/queries/artifact_get_query.go b/grade/internal/infrastructure/repositories/artifact/queries/artifact_get_query.go deleted file mode 100644 index e725ad22..00000000 --- a/grade/internal/infrastructure/repositories/artifact/queries/artifact_get_query.go +++ /dev/null @@ -1,22 +0,0 @@ -package queries - -import ( - "github.com/emacsway/grade/grade/internal/domain/artifact" - "github.com/emacsway/grade/grade/internal/infrastructure/seedwork/repository" - "github.com/emacsway/grade/grade/internal/infrastructure/seedwork/session" -) - -type ArtifactGetQuery struct { - repository.EventGetQuery -} - -func (q *ArtifactGetQuery) Get(s session.DbSessionQuerier) (*artifact.Artifact, error) { - stream, err := q.Stream(s) - if err != nil { - return nil, err - } - rec := &artifact.ArtifactReconstitutor{ - PastEvents: stream, - } - return rec.Reconstitute() -} diff --git a/grade/internal/infrastructure/seedwork/repository/event_get_query.go b/grade/internal/infrastructure/seedwork/repository/event_get_query.go index 6148adbb..aa3e9da0 100644 --- a/grade/internal/infrastructure/seedwork/repository/event_get_query.go +++ b/grade/internal/infrastructure/seedwork/repository/event_get_query.go @@ -5,9 +5,12 @@ import ( "github.com/emacsway/grade/grade/internal/infrastructure/seedwork/session" ) +type EventReconstitutor func(*session.Rows) (aggregate.PersistentDomainEvent, error) + type EventGetQuery struct { - StreamId StreamId - SincePosition uint + StreamId StreamId + SincePosition uint + EventReconstitutor EventReconstitutor } func (q EventGetQuery) sql() string { @@ -32,10 +35,12 @@ func (q *EventGetQuery) Stream(s session.DbSessionQuerier) ([]aggregate.Persiste } defer rows.Close() for rows.Next() { - err := rows.Scan() // TODO: implement me + // err := rows.Scan() // TODO: implement me + event, err := q.EventReconstitutor(&rows) if err != nil { return nil, err } + stream = append(stream, event) } err = rows.Err() if err != nil { diff --git a/grade/internal/infrastructure/seedwork/repository/event_store.go b/grade/internal/infrastructure/seedwork/repository/event_store.go index a8212219..ee7053eb 100644 --- a/grade/internal/infrastructure/seedwork/repository/event_store.go +++ b/grade/internal/infrastructure/seedwork/repository/event_store.go @@ -21,6 +21,13 @@ type EventStore struct { eventQuery EventQueryFactory } +func (r EventStore) NewStreamId( + tenantId int, + streamId string, +) (StreamId, error) { + return NewStreamId(tenantId, r.streamType, streamId) +} + func (r *EventStore) Save( agg aggregate.DomainEventAccessor[aggregate.PersistentDomainEvent], eventMeta aggregate.EventMeta,