Skip to content

Commit

Permalink
Remove ReportAlive functionality
Browse files Browse the repository at this point in the history
Now, when queue polls workers over REST for their aliveness, it does not make sense to keep workers ping queue with their alivenesses.
Let's keep a single way of updating worker aliveness statuses.
  • Loading branch information
Vladislav Alekseev committed Mar 25, 2020
1 parent 107e38c commit b14948c
Show file tree
Hide file tree
Showing 24 changed files with 21 additions and 429 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file.

## 2020-03-25

Workers no longer issue `reportAlive` requests to a queue. Queue now polls its workers instead and tracks their aliveness using REST.

## 2020-03-20

- New Graphite metric allows you to track durations of simulator operations like creating, booting, shutting down, and deleting.
Expand Down
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,7 @@ let package = Package(
"ResourceLocationResolver",
"Swifter",
"TemporaryStuff",
"TestHelpers",
"URLResource",
]
),
Expand Down
38 changes: 1 addition & 37 deletions Sources/DistWorker/DistWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import Timer

public final class DistWorker: SchedulerDelegate {
private let bucketResultSender: BucketResultSender
private let callbackQueue = DispatchQueue(label: "DistWorker.callbackQueue", qos: .default, attributes: .concurrent, autoreleaseFrequency: .inherit, target: nil)
private let callbackQueue = DispatchQueue(label: "DistWorker.callbackQueue", qos: .default, attributes: .concurrent)
private let currentlyBeingProcessedBucketsTracker = DefaultCurrentlyBeingProcessedBucketsTracker()
private let developerDirLocator: DeveloperDirLocator
private let onDemandSimulatorPool: OnDemandSimulatorPool
private let pluginEventBusProvider: PluginEventBusProvider
private let queueClient: SynchronousQueueClient
private let reportAliveSender: ReportAliveSender
private let resourceLocationResolver: ResourceLocationResolver
private let syncQueue = DispatchQueue(label: "DistWorker.syncQueue")
private let temporaryFolder: TemporaryFolder
Expand All @@ -40,7 +39,6 @@ public final class DistWorker: SchedulerDelegate {
private let workerRESTServer: WorkerRESTServer
private let workerRegisterer: WorkerRegisterer
private var payloadSignature = Either<PayloadSignature, DistWorkerError>.error(DistWorkerError.missingPayloadSignature)
private var reportingAliveTimer: DispatchBasedTimer?
private var requestIdForBucketId = [BucketId: RequestId]()

private enum BucketFetchResult: Equatable {
Expand All @@ -54,7 +52,6 @@ public final class DistWorker: SchedulerDelegate {
onDemandSimulatorPool: OnDemandSimulatorPool,
pluginEventBusProvider: PluginEventBusProvider,
queueClient: SynchronousQueueClient,
reportAliveSender: ReportAliveSender,
resourceLocationResolver: ResourceLocationResolver,
temporaryFolder: TemporaryFolder,
testRunnerProvider: TestRunnerProvider,
Expand All @@ -66,7 +63,6 @@ public final class DistWorker: SchedulerDelegate {
self.onDemandSimulatorPool = onDemandSimulatorPool
self.pluginEventBusProvider = pluginEventBusProvider
self.queueClient = queueClient
self.reportAliveSender = reportAliveSender
self.resourceLocationResolver = resourceLocationResolver
self.temporaryFolder = temporaryFolder
self.testRunnerProvider = testRunnerProvider
Expand Down Expand Up @@ -114,8 +110,6 @@ public final class DistWorker: SchedulerDelegate {

try didFetchAnalyticsConfiguration(workerConfiguration.analyticsConfiguration)

strongSelf.startReportingWorkerIsAlive(interval: workerConfiguration.reportAliveInterval)

_ = try strongSelf.runTests(
workerConfiguration: workerConfiguration,
onDemandSimulatorPool: strongSelf.onDemandSimulatorPool
Expand All @@ -129,35 +123,6 @@ public final class DistWorker: SchedulerDelegate {
completion()
}
}

}

private func startReportingWorkerIsAlive(interval: TimeInterval) {
reportingAliveTimer = DispatchBasedTimer.startedTimer(
repeating: .milliseconds(Int(interval * 1000.0)),
leeway: .seconds(1)) { [weak self] _ in
guard let strongSelf = self else { return }
do {
try strongSelf.reportAliveness()
} catch {
Logger.error("Failed to report aliveness: \(error)")
}
}
}

private func reportAliveness() throws {
reportAliveSender.reportAlive(
bucketIdsBeingProcessedProvider: currentlyBeingProcessedBucketsTracker.bucketIdsBeingProcessed,
workerId: workerId,
payloadSignature: try payloadSignature.dematerialize(),
callbackQueue: callbackQueue
) { (result: Either<ReportAliveResponse, Error>) in
do {
_ = try result.dematerialize()
} catch {
Logger.error("Report aliveness error: \(error)")
}
}
}

// MARK: - Private Stuff
Expand Down Expand Up @@ -188,7 +153,6 @@ public final class DistWorker: SchedulerDelegate {

public func cleanUpAndStop() {
queueClient.close()
reportingAliveTimer?.stop()
}

// MARK: - Callbacks
Expand Down
5 changes: 1 addition & 4 deletions Sources/DistWorkerModels/WorkerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ public struct WorkerConfiguration: Codable, Equatable {
public let analyticsConfiguration: AnalyticsConfiguration
public let numberOfSimulators: UInt
public let payloadSignature: PayloadSignature
public let reportAliveInterval: TimeInterval

public init(
analyticsConfiguration: AnalyticsConfiguration,
numberOfSimulators: UInt,
payloadSignature: PayloadSignature,
reportAliveInterval: TimeInterval
payloadSignature: PayloadSignature
) {
self.analyticsConfiguration = analyticsConfiguration
self.numberOfSimulators = numberOfSimulators
self.payloadSignature = payloadSignature
self.reportAliveInterval = reportAliveInterval
}
}
2 changes: 0 additions & 2 deletions Sources/EmceeLib/Commands/DistWorkCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public final class DistWorkCommand: Command {
) -> DistWorker {
let requestSender = requestSenderProvider.requestSender(socketAddress: queueServerAddress)

let reportAliveSender = ReportAliveSenderImpl(requestSender: requestSender)
let workerRegisterer = WorkerRegistererImpl(requestSender: requestSender)
let bucketResultSender = BucketResultSenderImpl(requestSender: requestSender)

Expand All @@ -102,7 +101,6 @@ public final class DistWorkCommand: Command {
onDemandSimulatorPool: onDemandSimulatorPool,
pluginEventBusProvider: pluginEventBusProvider,
queueClient: SynchronousQueueClient(queueServerAddress: queueServerAddress),
reportAliveSender: reportAliveSender,
resourceLocationResolver: resourceLocationResolver,
temporaryFolder: temporaryFolder,
testRunnerProvider: DefaultTestRunnerProvider(
Expand Down
1 change: 0 additions & 1 deletion Sources/EmceeLib/Commands/StartQueueServerCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public final class StartQueueServerCommand: Command {
queueServerLock: AutomaticTerminationControllerAwareQueueServerLock(
automaticTerminationController: automaticTerminationController
),
reportAliveInterval: queueServerRunConfiguration.reportAliveInterval,
requestSenderProvider: requestSenderProvider,
uniqueIdentifierGenerator: uniqueIdentifierGenerator,
workerAlivenessPolicy: .workersStayAliveWhenQueueIsDepleted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@ public struct QueueServerRunConfiguration: Decodable {
/// Defines when queue server will terminate itself.
public let queueServerTerminationPolicy: AutomaticTerminationPolicy

/// Period of time when workers should report their aliveness
public let reportAliveInterval: TimeInterval

public let workerDeploymentDestinations: [DeploymentDestination]

public init(
analyticsConfiguration: AnalyticsConfiguration,
checkAgainTimeInterval: TimeInterval,
deploymentDestinationConfigurations: [DestinationConfiguration],
queueServerTerminationPolicy: AutomaticTerminationPolicy,
reportAliveInterval: TimeInterval,
workerDeploymentDestinations: [DeploymentDestination]
) {
self.analyticsConfiguration = analyticsConfiguration
self.checkAgainTimeInterval = checkAgainTimeInterval
self.deploymentDestinationConfigurations = deploymentDestinationConfigurations
self.queueServerTerminationPolicy = queueServerTerminationPolicy
self.reportAliveInterval = reportAliveInterval
self.workerDeploymentDestinations = workerDeploymentDestinations
}

Expand All @@ -45,8 +40,7 @@ public struct QueueServerRunConfiguration: Decodable {
return WorkerConfiguration(
analyticsConfiguration: analyticsConfiguration,
numberOfSimulators: deploymentDestinationConfiguration.numberOfSimulators,
payloadSignature: payloadSignature,
reportAliveInterval: reportAliveInterval
payloadSignature: payloadSignature
)
}
}

This file was deleted.

This file was deleted.

30 changes: 0 additions & 30 deletions Sources/QueueServer/Endpoints/WorkerAlivenessEndpoint.swift

This file was deleted.

8 changes: 1 addition & 7 deletions Sources/QueueServer/QueueHTTPRESTServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ public final class QueueHTTPRESTServer {
self.httpRestServer = httpRestServer
}

public func setHandler<A1, A2, B1, B2, C1, C2, D1, D2, E1, E2, F1, F2, G1, G2, H1, H2, I1, I2>(
public func setHandler<A1, A2, B1, B2, C1, C2, E1, E2, F1, F2, G1, G2, H1, H2, I1, I2>(
bucketResultHandler: RESTEndpointOf<C1, C2>,
dequeueBucketRequestHandler: RESTEndpointOf<B1, B2>,
jobDeleteHandler: RESTEndpointOf<I1, I2>,
jobResultsHandler: RESTEndpointOf<H1, H2>,
jobStateHandler: RESTEndpointOf<G1, G2>,
registerWorkerHandler: RESTEndpointOf<A1, A2>,
reportAliveHandler: RESTEndpointOf<D1, D2>,
scheduleTestsHandler: RESTEndpointOf<E1, E2>,
versionHandler: RESTEndpointOf<F1, F2>
) {
Expand Down Expand Up @@ -58,11 +57,6 @@ public final class QueueHTTPRESTServer {
handler: registerWorkerHandler,
requestIndicatesActivity: true
)
httpRestServer.setHandler(
pathWithSlash: RESTMethod.reportAlive.withLeadingSlash,
handler: reportAliveHandler,
requestIndicatesActivity: false
)
httpRestServer.setHandler(
pathWithSlash: RESTMethod.scheduleTests.withLeadingSlash,
handler: scheduleTestsHandler,
Expand Down
13 changes: 3 additions & 10 deletions Sources/QueueServer/QueueServerImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public final class QueueServerImpl: QueueServer {
private let scheduleTestsHandler: ScheduleTestsEndpoint
private let stuckBucketsPoller: StuckBucketsPoller
private let testsEnqueuer: TestsEnqueuer
private let workerAlivenessEndpoint: WorkerAlivenessEndpoint
private let workerAlivenessMatricCapturer: WorkerAlivenessMatricCapturer
private let workerAlivenessPoller: WorkerAlivenessPoller
private let workerAlivenessProvider: WorkerAlivenessProvider
Expand All @@ -45,22 +44,21 @@ public final class QueueServerImpl: QueueServer {
localPortDeterminer: LocalPortDeterminer,
payloadSignature: PayloadSignature,
queueServerLock: QueueServerLock,
reportAliveInterval: TimeInterval,
requestSenderProvider: RequestSenderProvider,
uniqueIdentifierGenerator: UniqueIdentifierGenerator,
workerAlivenessPolicy: WorkerAlivenessPolicy,
workerConfigurations: WorkerConfigurations
) {
let alivenessPollingInterval: TimeInterval = 20
let workerDetailsHolder = WorkerDetailsHolderImpl()

self.workerAlivenessProvider = WorkerAlivenessProviderImpl(
dateProvider: dateProvider,
knownWorkerIds: workerConfigurations.workerIds,
reportAliveInterval: reportAliveInterval,
additionalTimeToPerformWorkerIsAliveReport: 30.0
maximumNotReportingDuration: alivenessPollingInterval * 2 + 10
)
self.workerAlivenessPoller = WorkerAlivenessPoller(
pollInterval: 20,
pollInterval: alivenessPollingInterval,
requestSenderProvider: requestSenderProvider,
workerAlivenessProvider: workerAlivenessProvider,
workerDetailsHolder: workerDetailsHolder
Expand Down Expand Up @@ -95,10 +93,6 @@ public final class QueueServerImpl: QueueServer {
testsEnqueuer: testsEnqueuer,
uniqueIdentifierGenerator: uniqueIdentifierGenerator
)
self.workerAlivenessEndpoint = WorkerAlivenessEndpoint(
workerAlivenessProvider: workerAlivenessProvider,
expectedPayloadSignature: payloadSignature
)
self.workerRegistrar = WorkerRegistrar(
workerAlivenessProvider: workerAlivenessProvider,
workerConfigurations: workerConfigurations,
Expand Down Expand Up @@ -151,7 +145,6 @@ public final class QueueServerImpl: QueueServer {
jobResultsHandler: RESTEndpointOf(actualHandler: jobResultsEndpoint),
jobStateHandler: RESTEndpointOf(actualHandler: jobStateEndpoint),
registerWorkerHandler: RESTEndpointOf(actualHandler: workerRegistrar),
reportAliveHandler: RESTEndpointOf(actualHandler: workerAlivenessEndpoint),
scheduleTestsHandler: RESTEndpointOf(actualHandler: scheduleTestsHandler),
versionHandler: RESTEndpointOf(actualHandler: queueServerVersionHandler)
)
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueueServer/WorkerAlivenessPoller.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public final class WorkerAlivenessPoller {
Logger.debug("Polling \(workerId) for currently processing buckets")
sender.sendRequestWithCallback(
request: CurrentlyProcessingBucketsNetworkRequest(
timeout: pollInterval / 2.0
timeout: pollInterval
),
callbackQueue: queue,
callback: { [weak self] (response: Either<CurrentlyProcessingBucketsResponse, RequestSenderError>) in
Expand Down
18 changes: 0 additions & 18 deletions Sources/RESTMethods/ReportAlive/ReportAlivePayload.swift

This file was deleted.

Loading

0 comments on commit b14948c

Please sign in to comment.