Skip to content

Commit

Permalink
[Implementation] StreamClient.Subscribe.
Browse files Browse the repository at this point in the history
[Refactor] some data structure form StreamClient.Read to Stream
  • Loading branch information
Grady Zhuo committed Jun 23, 2024
1 parent 31c7187 commit 2f133a7
Show file tree
Hide file tree
Showing 26 changed files with 600 additions and 213 deletions.
2 changes: 1 addition & 1 deletion Sources/EventStoreDB/Event/ReadEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public struct ReadEvent: Sendable {
case .noPosition:
commitPosition = nil
case let .commitPosition(commitPosition):
self.commitPosition = .init(commit: commitPosition)
self.commitPosition = .at(commitPosition: commitPosition)
}
} else {
commitPosition = nil
Expand Down
4 changes: 2 additions & 2 deletions Sources/EventStoreDB/Event/RecordedEvent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public struct RecordedEvent: EventStoreEvent, Sendable {
let contentType = ContentType(rawValue: message.metadata["content-type"] ?? ContentType.binary.rawValue) ?? .unknown
let streamIdentifier = message.streamIdentifier.toIdentifier()
let revision = message.streamRevision
let position = Stream.Position(commit: message.commitPosition, prepare: message.preparePosition)
let position = Stream.Position.at(commitPosition: message.commitPosition, preparePosition: message.preparePosition)

self.init(id: id, eventType: eventType, contentType: contentType, streamIdentifier: streamIdentifier, revision: revision, position: position, data: message.data, customMetadata: message.customMetadata)
}
Expand All @@ -76,7 +76,7 @@ public struct RecordedEvent: EventStoreEvent, Sendable {
let contentType = ContentType(rawValue: message.metadata["content-type"] ?? ContentType.binary.rawValue) ?? .unknown
let streamIdentifier = message.streamIdentifier.toIdentifier()
let revision = message.streamRevision
let position = Stream.Position(commit: message.commitPosition, prepare: message.preparePosition)
let position = Stream.Position.at(commitPosition: message.commitPosition, preparePosition: message.preparePosition)

self.init(id: id, eventType: eventType, contentType: contentType, streamIdentifier: streamIdentifier, revision: revision, position: position, data: message.data, customMetadata: message.customMetadata)
}
Expand Down
21 changes: 19 additions & 2 deletions Sources/EventStoreDB/EventStoreDBClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,27 @@ extension EventStoreDBClient {
return try client.read(stream: streamIdentifier, cursor: cursor, options: options)
}

public func readStream(to streamIdentifier: Stream.Identifier, at revision: UInt64, direction: StreamClient.Read.Direction = .forward, configure: (_ options: StreamClient.Read.Options) -> StreamClient.Read.Options = { $0 }) throws -> StreamClient.Read.Responses {
public func readStream(to streamIdentifier: Stream.Identifier, at revision: UInt64, direction: Stream.Direction = .forward, configure: (_ options: StreamClient.Read.Options) -> StreamClient.Read.Options = { $0 }) throws -> StreamClient.Read.Responses {
let cursor: Cursor<StreamClient.Read.CursorPointer> = .specified(.init(revision: revision, direction: direction))
return try readStream(to: streamIdentifier, cursor: cursor, configure: configure)
}

// MARK: Subscribe by all streams methods -

public func subscribeToAll(from cursor: Cursor<Stream.Position>, configure: (_ options: StreamClient.SubscribeToAll.Options) -> StreamClient.SubscribeToAll.Options = { $0 }) async throws -> StreamClient.Subscription{
let client = try StreamClient(channel: channel, callOptions: defaultCallOptions)

let options = configure(.init())
return try await client.subscribeToAll(from: cursor, options: options)
}

public func subscribeTo(stream: Stream.Identifier, from cursor: Cursor<Stream.Revision>, configure: (_ options: StreamClient.Subscribe.Options) -> StreamClient.Subscribe.Options = { $0 }) async throws -> StreamClient.Subscription{
let client = try StreamClient(channel: channel, callOptions: defaultCallOptions)

let options = configure(.init())
return try await client.subscribe(stream: stream, from: cursor, options: options)
}

// MARK: (Soft) Delete a stream -

@discardableResult
Expand Down Expand Up @@ -160,8 +176,9 @@ extension EventStoreDBClient {
try await handler.handle(response: underlyingClient.create(request))
}

public func createPersistentSubscriptionToAll(groupName: String, options: PersistentSubscriptionsClient.Create.ToAll.Options = .init()) async throws {
public func createPersistentSubscriptionToAll(groupName: String, configure: (_ options: PersistentSubscriptionsClient.Create.ToAll.Options) -> PersistentSubscriptionsClient.Create.ToAll.Options = { $0 }) async throws {
let underlyingClient = try PersistentSubscriptionsClient.UnderlyingClient(channel: channel, defaultCallOptions: defaultCallOptions)
let options = configure(.init())
let handler: PersistentSubscriptionsClient.Create.ToAll = .init(groupName: groupName, options: options)

let request = try handler.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ extension PersistentSubscriptionsClient.Create.ToStream {
case .end:
$0.stream.end = .init()
case let .specified(revision):
$0.stream.revision = revision.value
$0.stream.revision = revision
}
}
}
Expand All @@ -101,10 +101,10 @@ extension PersistentSubscriptionsClient.Create.ToAll {
public typealias UnderlyingMessage = Request.UnderlyingMessage.Options

public var settings: PersistentSubscriptionsClient.Settings
public var filter: StreamClient.FilterOption?
public var filter: Stream.SubscriptionFilter?
public var positionCursor: Cursor<Stream.Position>

public init(settings: PersistentSubscriptionsClient.Settings = .init(), filter: StreamClient.FilterOption? = nil, positionCursor: Cursor<Stream.Position> = .end) {
public init(settings: PersistentSubscriptionsClient.Settings = .init(), filter: Stream.SubscriptionFilter? = nil, positionCursor: Cursor<Stream.Position> = .end) {
self.settings = settings
self.filter = filter
self.positionCursor = positionCursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ extension ReadEvent {
case .noPosition:
commitPosition = nil
case let .commitPosition(commitPosition):
self.commitPosition = .init(commit: commitPosition)
self.commitPosition = .at(commitPosition: commitPosition)
}
} else {
commitPosition = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ extension PersistentSubscriptionsClient.Update {
case .end:
$0.stream.end = .init()
case let .specified(revision):
$0.stream.revision = revision.value
$0.stream.revision = revision
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public struct PersistentSubscriptionsClient: GRPCConcreteClient {

extension PersistentSubscriptionsClient {
public enum StreamSelection {
case all(position: Cursor<StreamClient.Read.Position>, filterOption: StreamClient.FilterOption? = nil)
case all(position: Cursor<Stream.Position>, filterOption: Stream.SubscriptionFilter? = nil)
case specified(identifier: Stream.Identifier, revision: Cursor<UInt64>)

public static func specified(identifier: Stream.Identifier) -> Self {
Expand Down Expand Up @@ -142,7 +142,7 @@ extension PersistentSubscriptionsClient {
func subscribeTo(_ streamSelection: Selector<Stream.Identifier>, groupName: String, options: Read.Options) async throws -> Subscription {
let handler = Read(streamSelection: streamSelection, groupName: groupName, options: options)
let requests = try handler.build()

let getSubscriptionCall = underlyingClient.makeReadCall()
try await getSubscriptionCall.requestStream.send(requests)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//
// Stream.ControlOption.swift
//
//
// Created by Grady Zhuo on 2024/6/23.
//

extension Stream {
public enum ControlOption: Sendable {
case compatibility(UInt32)
}
}
13 changes: 13 additions & 0 deletions Sources/EventStoreDB/StreamClient/Stream/Stream.Direction.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//
// Stream.Direction.swift
//
//
// Created by Grady Zhuo on 2024/6/23.
//

extension Stream {
public enum Direction: Sendable {
case forward
case backward
}
}
65 changes: 65 additions & 0 deletions Sources/EventStoreDB/StreamClient/Stream/Stream.FilterOption.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Stream.FilterOption.swift
//
//
// Created by Grady Zhuo on 2024/6/23.
//

import GRPCEncapsulates

extension Stream {
public struct SubscriptionFilter: FluentInterfaceOptions {
public enum Window: Sendable {
case count
case max(UInt32)
}

public enum FilterType: Sendable {
case streamName(regex: String)
case eventType(regex: String)
}

public internal(set) var type: FilterType
public internal(set) var window: Window
public internal(set) var prefixes: [String]
public internal(set) var checkpointIntervalMultiplier: UInt32

init(type: FilterType, window: Window = .count, prefixes: [String] = []) {
self.type = type
self.window = window
self.prefixes = prefixes
checkpointIntervalMultiplier = .max
}

@discardableResult
public static func onStreamName(regex: String) -> Self {
.init(type: .streamName(regex: regex))
}

@discardableResult
public static func onEventType(regex: String) -> Self {
.init(type: .eventType(regex: regex))
}

@discardableResult
public func set(max maxCount: UInt32) -> Self {
withCopy { options in
options.window = .max(maxCount)
}
}

@discardableResult
public func set(checkpointIntervalMultiplier multiplier: UInt32) -> Self {
withCopy { options in
options.checkpointIntervalMultiplier = multiplier
}
}

@discardableResult
public func add(prefix: String) -> Self {
withCopy { options in
options.prefixes.append(prefix)
}
}
}
}
11 changes: 8 additions & 3 deletions Sources/EventStoreDB/StreamClient/Stream/Stream.Position.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ extension Stream {
public struct Position: Sendable {
public let commit: UInt64
public let prepare: UInt64

public init(commit: UInt64, prepare: UInt64? = nil) {

public static func at(commitPosition: UInt64, preparePosition: UInt64? = nil) -> Self {
let preparePosition = preparePosition ?? commitPosition
return .init(commit: commitPosition, prepare: preparePosition)
}

private init(commit: UInt64, prepare: UInt64) {
self.commit = commit
self.prepare = prepare ?? commit
self.prepare = prepare
}
}
}
Expand Down
18 changes: 2 additions & 16 deletions Sources/EventStoreDB/StreamClient/Stream/Stream.Revision.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,5 @@
import Foundation

extension Stream {
public struct Revision: Sendable {
public private(set) var value: UInt64

public init(_ value: UInt64) {
self.value = value
}
}
}

extension Stream.Revision: ExpressibleByIntegerLiteral {
public typealias IntegerLiteralType = UInt64

public init(integerLiteral value: UInt64) {
self.value = value
}
}
public typealias Revision = UInt64
}
14 changes: 14 additions & 0 deletions Sources/EventStoreDB/StreamClient/Stream/Stream.UUIDOption.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//
// Stream.UUIDOption.swift
//
//
// Created by Grady Zhuo on 2024/6/23.
//
import Foundation

extension Stream{
public enum UUIDOption: Sendable {
case structured
case string
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ extension StreamClient.Read {

public private(set) var resolveLinks: Bool = false
public private(set) var limit: UInt64 = .max
public private(set) var uuidOption: StreamClient.Read.UUIDOption = .string
public private(set) var uuidOption: Stream.UUIDOption = .string

package func build() -> UnderlyingMessage {
.with {
Expand Down Expand Up @@ -47,7 +47,7 @@ extension StreamClient.Read {
}

@discardableResult
public func set(uuidOption: StreamClient.Read.UUIDOption) -> Self {
public func set(uuidOption: Stream.UUIDOption) -> Self {
withCopy { options in
options.uuidOption = uuidOption
}
Expand Down
Loading

0 comments on commit 2f133a7

Please sign in to comment.