Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work around open-telemetry/opentelemetry-swift#615 #667

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
21 changes: 3 additions & 18 deletions Package@swift-5.9.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ let package = Package(
.package(url: "https://github.com/apple/swift-protobuf.git", from: "1.20.2"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"),
.package(url: "https://github.com/apple/swift-metrics.git", from: "2.1.1"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0")
],
targets: [
.target(name: "OpenTelemetryApi",
dependencies: []),
.target(name: "OpenTelemetrySdk",
dependencies: ["OpenTelemetryApi"].withAtomicsIfNeeded()),
dependencies: ["OpenTelemetryApi",
.product(name: "Atomics", package: "swift-atomics", condition: .when(platforms: [.linux]))]),
.target(name: "OpenTelemetryConcurrency",
dependencies: ["OpenTelemetryApi"]),
.target(name: "OpenTelemetryTestUtils",
Expand Down Expand Up @@ -134,25 +136,8 @@ let package = Package(
]
).addPlatformSpecific()

extension [Target.Dependency] {
func withAtomicsIfNeeded() -> [Target.Dependency] {
#if canImport(Darwin)
return self
#else
var dependencies = self
dependencies.append(.product(name: "Atomics", package: "swift-atomics"))
return dependencies
#endif
}
}

extension Package {
func addPlatformSpecific() -> Self {
#if !canImport(Darwin)
dependencies.append(
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0"))
)
#endif
#if canImport(ObjectiveC)
dependencies.append(
.package(url: "https://github.com/undefinedlabs/opentracing-objc", from: "0.5.2")
Expand Down
6 changes: 5 additions & 1 deletion Sources/Exporters/OpenTelemetryProtocolHttp/Lock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/OpenTracingShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
import Glibc
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
6 changes: 5 additions & 1 deletion Sources/Importers/SwiftMetricsShim/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
31 changes: 31 additions & 0 deletions Sources/OpenTelemetrySdk/Common/WorkerThread.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import Foundation

#if (swift(>=6) && os(Linux)) || OPENTELEMETRY_SWIFT_LINUX_COMPAT
// https://github.com/open-telemetry/opentelemetry-swift/issues/615 prevents Linux builds from succeeding due to a regression in Swift 6 when subclassing Thread. We can work around this by using a block based Thread.
class WorkerThread {
var thread: Thread!

var isCancelled: Bool {
self.thread.isCancelled
}

init() {
self.thread = Thread(block: { [weak self] in
self?.main()
})
}

func main() {}

func start() {
self.thread.start()
}

func cancel() {
self.thread.cancel()
}
}
#else
// Builds using a Swift older than 5 or on a non-Linux OS should be able to use the normal Thread subclass
class WorkerThread: Thread {}
#endif
6 changes: 5 additions & 1 deletion Sources/OpenTelemetrySdk/Internal/Locks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@

#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
#else
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#else
#error("Unsupported platform")
#endif

/// A threading lock based on `libpthread` instead of `libdispatch`.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//
//

import Foundation
import OpenTelemetryApi
Expand Down Expand Up @@ -36,7 +36,7 @@ public class BatchLogRecordProcessor: LogRecordProcessor {
}
}

private class BatchWorker: Thread {
private class BatchWorker: WorkerThread {
let logRecordExporter: LogRecordExporter
let scheduleDelay: TimeInterval
let maxQueueSize: Int
Expand Down Expand Up @@ -69,7 +69,7 @@ private class BatchWorker: Thread {

func emit(logRecord: ReadableLogRecord) {
cond.lock()
defer { cond.unlock()}
defer { cond.unlock() }
if logRecordList.count == maxQueueSize {
// TODO: record a counter for dropped logs
return
Expand All @@ -84,18 +84,18 @@ private class BatchWorker: Thread {

override func main() {
repeat {
autoreleasepool {
var logRecordsCopy: [ReadableLogRecord]
cond.lock()
if logRecordList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while logRecordList.isEmpty && !self.isCancelled
}
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
autoreleasepool {
var logRecordsCopy: [ReadableLogRecord]
cond.lock()
if logRecordList.count < maxExportBatchSize {
repeat {
cond.wait(until: Date().addingTimeInterval(scheduleDelay))
} while logRecordList.isEmpty && !self.isCancelled
}
logRecordsCopy = logRecordList
logRecordList.removeAll()
cond.unlock()
self.exportBatch(logRecordList: logRecordsCopy, explicitTimeout: exportTimeout)
}
} while !self.isCancelled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public struct BatchSpanProcessor: SpanProcessor {
/// BatchWorker is a thread that batches multiple spans and calls the registered SpanExporter to export
/// the data.
/// The list of batched data is protected by a NSCondition which ensures full concurrency.
private class BatchWorker: Thread {
private class BatchWorker: WorkerThread {
let spanExporter: SpanExporter
let meterProvider: StableMeterProvider?
let scheduleDelay: TimeInterval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ class BatchLogRecordProcessorTests : XCTestCase {
let exported = waitingExporter.waitForExport()
XCTAssertEqual(exported?.count, 9)
}

func testShutdownNoMemoryCycle() {
// A weak reference to the exporter that will be retained by the BatchWorker
weak var exporter: WaitingLogRecordExporter?
do {
let waitingExporter = WaitingLogRecordExporter(numberToWaitFor: 2)
exporter = waitingExporter
let processors = [BatchLogRecordProcessor(logRecordExporter: waitingExporter,scheduleDelay: maxScheduleDelay)]
let loggerProvider = LoggerProviderBuilder().with(processors: processors).build()
let logger = loggerProvider.get(instrumentationScopeName: "BatchLogRecordProcessorTest")
logger.logRecordBuilder().emit()
logger.logRecordBuilder().emit()
let exported = waitingExporter.waitForExport()
XCTAssertEqual(exported?.count, 2)

for processor in processors {
_ = processor.shutdown()
}
}

// After the BatchWorker is shutdown, it will continue waiting for the condition variable to be signaled up to the maxScheduleDelay. Until that point the exporter won't be deallocated.
sleep(UInt32(ceil(maxScheduleDelay + 1)))
// Interestingly, this will always succeed on macOS even if you intentionally create a strong reference cycle between the BatchWorker and the Thread's closure. I assume either calling cancel or the thread exiting releases the closure which breaks the cycle. This is not the case on Linux where the test will fail as expected.
XCTAssertNil(exporter)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class BatchSpansProcessorTests: XCTestCase {
let spanName1 = "MySpanName/1"
let spanName2 = "MySpanName/2"
let maxScheduleDelay = 0.5
let tracerSdkFactory = TracerProviderSdk()
var tracerSdkFactory = TracerProviderSdk()
var tracer: Tracer!
let blockingSpanExporter = BlockingSpanExporter()
var mockServiceHandler = SpanExporterMock()
Expand Down Expand Up @@ -227,6 +227,38 @@ class BatchSpansProcessorTests: XCTestCase {
XCTAssertEqual(exported, [span2.toSpanData()])
XCTAssertTrue(waitingSpanExporter.shutdownCalled)
}

func testShutdownNoMemoryCycle() {
// A weak reference to the exporter that will be retained by the BatchWorker
weak var exporter: WaitingSpanExporter?
do {
let waitingSpanExporter = WaitingSpanExporter(numberToWaitFor: 2)
exporter = waitingSpanExporter

let batch = BatchSpanProcessor(
spanExporter: waitingSpanExporter,
meterProvider: DefaultStableMeterProvider.instance,
scheduleDelay: maxScheduleDelay
)

tracerSdkFactory.addSpanProcessor(batch)
let span1 = createSampledEndedSpan(spanName: spanName1)
let span2 = createSampledEndedSpan(spanName: spanName2)
let exported = waitingSpanExporter.waitForExport()

XCTAssertEqual(exported, [span1.toSpanData(), span2.toSpanData()])

tracerSdkFactory.shutdown()
// Both the provider and the tracer retain the exporter, so we need to clear those out in order for the exporter to be deallocated.
tracerSdkFactory = TracerProviderSdk()
tracer = nil
}

// After the BatchWorker is shutdown, it will continue waiting for the condition variable to be signaled up to the maxScheduleDelay. Until that point the exporter won't be deallocated.
sleep(UInt32(ceil(maxScheduleDelay + 1)))
// Interestingly, this will always succeed on macOS even if you intentionally create a strong reference cycle between the BatchWorker and the Thread's closure. I assume either calling cancel or the thread exiting releases the closure which breaks the cycle. This is not the case on Linux where the test will fail as expected.
XCTAssertNil(exporter)
}
}

class BlockingSpanExporter: SpanExporter {
Expand Down
Loading