Skip to content

Commit

Permalink
[UPDATE] remove defer function to close channel, it will cause the su…
Browse files Browse the repository at this point in the history
…bscription failed in the future.
  • Loading branch information
Grady Zhuo committed Sep 8, 2024
1 parent 89197c7 commit c861288
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 33 deletions.
24 changes: 0 additions & 24 deletions Sources/EventStoreDB/EventStoreDBClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ extension EventStoreDBClient {
/// - Returns: AsyncStream to Read.Response
public func readStream(to streamIdentifier: Stream.Identifier, cursor: Cursor<StreamClient.Read.CursorPointer>, configure: (_ options: StreamClient.Read.Options) -> StreamClient.Read.Options = { $0 }) throws -> StreamClient.Read.Responses {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
// defer{
// let promise = group.any().makePromise(of: Void.self)
// channel.closeGracefully(deadline: .distantFuture, promise: promise)
// }
let client = StreamClient(channel: channel, callOptions: defaultCallOptions)
let options = configure(.init())

Expand Down Expand Up @@ -179,10 +175,6 @@ extension EventStoreDBClient {
extension EventStoreDBClient {
public func startScavenge(threadCount: Int32, startFromChunk: Int32) async throws -> OperationsClient.ScavengeResponse {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let client = OperationsClient(channel: channel, callOptions: defaultCallOptions)
return try await client.startScavenge(threadCount: threadCount, startFromChunk: startFromChunk)
}
Expand All @@ -191,10 +183,6 @@ extension EventStoreDBClient {
extension EventStoreDBClient {
public func createPersistentSubscription(to identifier: Stream.Identifier, groupName: String, options: PersistentSubscriptionsClient.Create.ToStream.Options = .init()) async throws {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let underlyingClient = PersistentSubscriptionsClient.UnderlyingClient(channel: channel, defaultCallOptions: defaultCallOptions)
let handler: PersistentSubscriptionsClient.Create.ToStream = .init(streamIdentifier: identifier, groupName: groupName, options: options)

Expand All @@ -205,10 +193,6 @@ extension EventStoreDBClient {

public func createPersistentSubscriptionToAll(groupName: String, configure: (_ options: PersistentSubscriptionsClient.Create.ToAll.Options) -> PersistentSubscriptionsClient.Create.ToAll.Options = { $0 }) async throws {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let underlyingClient = PersistentSubscriptionsClient.UnderlyingClient(channel: channel, defaultCallOptions: defaultCallOptions)
let options = configure(.init())
let handler: PersistentSubscriptionsClient.Create.ToAll = .init(groupName: groupName, options: options)
Expand All @@ -221,10 +205,6 @@ extension EventStoreDBClient {

public func restartPersistentSubscriptionSubsystem() async throws {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let client = PersistentSubscriptionsClient(channel: channel, callOptions: defaultCallOptions)
return try await client.restartSubsystem()
}
Expand All @@ -233,10 +213,6 @@ extension EventStoreDBClient {

public func subscribePersistentSubscription(to streamSelection: Selector<Stream.Identifier>, groupName: String, configure: (_ options: PersistentSubscriptionsClient.Read.Options) -> PersistentSubscriptionsClient.Read.Options = { $0 }) async throws -> PersistentSubscriptionsClient.Subscription {
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let client = PersistentSubscriptionsClient(channel: channel, callOptions: defaultCallOptions)

let options = configure(.init())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ final class EventStoreDBPersistentSubscriptionTests: XCTestCase {
override func setUp() async throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let subscriptionClient = try PersistentSubscriptionsClient(channel: channel, callOptions: settings.makeCallOptions())
do {
try await subscriptionClient.deleteOn(stream: streamSelector, groupName: groupName)
Expand All @@ -37,10 +33,6 @@ final class EventStoreDBPersistentSubscriptionTests: XCTestCase {
func testCreate() async throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let channel = try GRPCChannelPool.with(settings: settings, group: group)
defer{
let promise = group.any().makePromise(of: Void.self)
channel.closeGracefully(deadline: .now(), promise: promise)
}
let subscriptionClient = try PersistentSubscriptionsClient(channel: channel, callOptions: settings.makeCallOptions())

let client = EventStoreDBClient(settings: settings)
Expand All @@ -52,6 +44,7 @@ final class EventStoreDBPersistentSubscriptionTests: XCTestCase {

func testSubscribe() async throws {
try await testCreate()


let settings = ClientSettings.localhost()
let client = EventStoreDBClient(settings: settings)
Expand All @@ -66,7 +59,7 @@ final class EventStoreDBPersistentSubscriptionTests: XCTestCase {
)) { options in
options.revision(expected: .any)
}

var lastEventResult: PersistentSubscriptionsClient.Subscription.EventResult? = nil
for try await result in subscription {
lastEventResult = result
Expand Down

0 comments on commit c861288

Please sign in to comment.