Skip to content

Commit

Permalink
Remove worker blocking functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Alekseev committed Mar 25, 2020
1 parent b14948c commit 6abc4ed
Show file tree
Hide file tree
Showing 23 changed files with 11 additions and 160 deletions.
2 changes: 0 additions & 2 deletions Sources/BalancingBucketQueue/BalancingBucketQueueImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ final class BalancingBucketQueueImpl: BalancingBucketQueue {
switch dequeueResult {
case .dequeuedBucket:
return dequeueResult
case .workerIsBlocked:
return .workerIsBlocked
case .workerIsNotAlive:
return .workerIsNotAlive
case .queueIsEmpty, .checkAgainLater:
Expand Down
4 changes: 0 additions & 4 deletions Sources/BucketQueue/BucketQueueImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ final class BucketQueueImpl: BucketQueue {

public func dequeueBucket(requestId: RequestId, workerId: WorkerId) -> DequeueResult {
switch workerAlivenessProvider.alivenessForWorker(workerId: workerId).status {
case .blocked:
return .workerIsBlocked
case .silent, .notRegistered:
return .workerIsNotAlive
case .alive:
Expand Down Expand Up @@ -179,8 +177,6 @@ final class BucketQueueImpl: BucketQueue {
return nil
}
stuckReason = .bucketLost
case .blocked:
stuckReason = .workerIsBlocked
case .silent(let lastAlivenessResponseTimestamp):
stuckReason = .workerIsSilent(since: lastAlivenessResponseTimestamp)
}
Expand Down
1 change: 0 additions & 1 deletion Sources/BucketQueue/DequeuedBucket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public enum DequeueResult: Hashable {
case queueIsEmpty
case checkAgainLater(checkAfter: TimeInterval)
case dequeuedBucket(DequeuedBucket)
case workerIsBlocked
case workerIsNotAlive
}

Expand Down
3 changes: 0 additions & 3 deletions Sources/BucketQueue/StuckBucket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import QueueModels

public struct StuckBucket: Equatable {
public enum Reason: Equatable, CustomStringConvertible {
case workerIsBlocked
case workerIsSilent(since: Date)
case bucketLost

Expand All @@ -14,8 +13,6 @@ public struct StuckBucket: Equatable {
case .workerIsSilent(let since):
let formatted = NSLogLikeLogEntryTextFormatter.logDateFormatter.string(from: since)
return "worker is silent since \(formatted)"
case .workerIsBlocked:
return "worker is blocked"
case .bucketLost:
return "worker has been processing bucket but then switched to another bucket"
}
Expand Down
3 changes: 0 additions & 3 deletions Sources/DistWorker/DistWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,6 @@ public final class DistWorker: SchedulerDelegate {
case .queueIsEmpty:
Logger.debug("Server returned that queue is empty")
return .result(nil)
case .workerHasBeenBlocked:
Logger.error("Server has blocked this worker")
return .result(nil)
case .workerConsideredNotAlive:
Logger.error("Server considers this worker as not alive")
return .result(nil)
Expand Down
2 changes: 0 additions & 2 deletions Sources/QueueClient/QueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ public final class QueueClient {
delegate?.queueClientQueueIsEmpty(self)
case .workerIsNotAlive:
delegate?.queueClientWorkerConsideredNotAlive(self)
case .workerIsBlocked:
delegate?.queueClientWorkerHasBeenBlocked(self)
}
}

Expand Down
1 change: 0 additions & 1 deletion Sources/QueueClient/QueueClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ public protocol QueueClientDelegate: class {
func queueClient(_ sender: QueueClient, didFailWithError error: QueueClientError)
func queueClientQueueIsEmpty(_ sender: QueueClient)
func queueClientWorkerConsideredNotAlive(_ sender: QueueClient)
func queueClientWorkerHasBeenBlocked(_ sender: QueueClient)
func queueClient(_ sender: QueueClient, fetchBucketLaterAfter after: TimeInterval)
func queueClient(_ sender: QueueClient, didFetchBucket bucket: Bucket)
func queueClientDidScheduleTests(_ sender: QueueClient, requestId: RequestId)
Expand Down
5 changes: 0 additions & 5 deletions Sources/QueueClient/SynchronousQueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
case bucket(Bucket)
case queueIsEmpty
case checkLater(TimeInterval)
case workerHasBeenBlocked
case workerConsideredNotAlive
}

Expand Down Expand Up @@ -160,10 +159,6 @@ public final class SynchronousQueueClient: QueueClientDelegate {
bucketFetchResult = Either.success(.workerConsideredNotAlive)
}

public func queueClientWorkerHasBeenBlocked(_ sender: QueueClient) {
bucketFetchResult = Either.success(.workerHasBeenBlocked)
}

public func queueClient(_ sender: QueueClient, fetchBucketLaterAfter after: TimeInterval) {
bucketFetchResult = Either.success(.checkLater(after))
}
Expand Down
2 changes: 0 additions & 2 deletions Sources/QueueServer/Endpoints/BucketProviderEndpoint.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public final class BucketProviderEndpoint: PayloadSignatureVerifyingRESTEndpoint
return .bucketDequeued(bucket: dequeuedBucket.enqueuedBucket.bucket)
case .workerIsNotAlive:
return .workerIsNotAlive
case .workerIsBlocked:
return .workerIsBlocked
}
}
}
17 changes: 6 additions & 11 deletions Sources/QueueServer/Endpoints/BucketResultRegistrar.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,11 @@ public final class BucketResultRegistrar: PayloadSignatureVerifyingRESTEndpoint
}

public func handle(verifiedPayload: BucketResultPayload) throws -> BucketResultAcceptResponse {
do {
let acceptResult = try bucketResultAccepter.accept(
testingResult: verifiedPayload.testingResult,
requestId: verifiedPayload.requestId,
workerId: verifiedPayload.workerId
)
return .bucketResultAccepted(bucketId: acceptResult.dequeuedBucket.enqueuedBucket.bucket.bucketId)
} catch {
workerAlivenessProvider.blockWorker(workerId: verifiedPayload.workerId)
throw error
}
let acceptResult = try bucketResultAccepter.accept(
testingResult: verifiedPayload.testingResult,
requestId: verifiedPayload.requestId,
workerId: verifiedPayload.workerId
)
return .bucketResultAccepted(bucketId: acceptResult.dequeuedBucket.enqueuedBucket.bucket.bucketId)
}
}
5 changes: 0 additions & 5 deletions Sources/QueueServer/Endpoints/WorkerRegistrar.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@ public final class WorkerRegistrar: RESTEndpoint {

public enum WorkerRegistrarError: Swift.Error, CustomStringConvertible {
case missingWorkerConfiguration(workerId: WorkerId)
case workerIsBlocked(workerId: WorkerId)
case workerIsAlreadyRegistered(workerId: WorkerId)

public var description: String {
switch self {
case .missingWorkerConfiguration(let workerId):
return "Missing worker configuration for \(workerId)"
case .workerIsBlocked(let workerId):
return "Can't register \(workerId) because it has been blocked"
case .workerIsAlreadyRegistered(let workerId):
return "Can't register \(workerId) because it is already registered"
}
Expand Down Expand Up @@ -57,8 +54,6 @@ public final class WorkerRegistrar: RESTEndpoint {
return .workerRegisterSuccess(workerConfiguration: workerConfiguration)
case .alive:
throw WorkerRegistrarError.workerIsAlreadyRegistered(workerId: decodedPayload.workerId)
case .blocked:
throw WorkerRegistrarError.workerIsBlocked(workerId: decodedPayload.workerId)
}
}
}
1 change: 0 additions & 1 deletion Sources/QueueServer/StuckBucketsPoller.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ private extension StuckBucket.Reason {
var metricParameterName: String {
switch self {
case .workerIsSilent: return "workerIsSilent"
case .workerIsBlocked: return "workerIsBlocked"
case .bucketLost: return "bucketLost"
}
}
Expand Down
1 change: 0 additions & 1 deletion Sources/QueueServer/WorkerAlivenessMatricCapturer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ private extension WorkerAliveness.Status {
var metricComponentName: String {
switch self {
case .alive: return "alive"
case .blocked: return "blocked"
case .notRegistered: return "notRegistered"
case .silent: return "silent"
}
Expand Down
6 changes: 0 additions & 6 deletions Sources/RESTMethods/DequeueBucket/DequeueBucketResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public enum DequeueBucketResponse: Codable, Equatable {
case bucketDequeued(bucket: Bucket)
case queueIsEmpty
case workerIsNotAlive
case workerIsBlocked
case checkAgainLater(checkAfter: TimeInterval)

private enum CodingKeys: CodingKey {
Expand All @@ -18,7 +17,6 @@ public enum DequeueBucketResponse: Codable, Equatable {
case bucketDequeued
case queueIsEmpty
case workerIsNotAlive
case workerIsBlocked
case checkAgainLater
}

Expand All @@ -34,8 +32,6 @@ public enum DequeueBucketResponse: Codable, Equatable {
self = .workerIsNotAlive
case .checkAgainLater:
self = .checkAgainLater(checkAfter: try container.decode(TimeInterval.self, forKey: .checkAfter))
case .workerIsBlocked:
self = .workerIsBlocked
}
}

Expand All @@ -52,8 +48,6 @@ public enum DequeueBucketResponse: Codable, Equatable {
try container.encode(CaseId.queueIsEmpty, forKey: .caseId)
case .workerIsNotAlive:
try container.encode(CaseId.workerIsNotAlive, forKey: .caseId)
case .workerIsBlocked:
try container.encode(CaseId.workerIsBlocked, forKey: .caseId)
}
}
}
3 changes: 0 additions & 3 deletions Sources/WorkerAlivenessProvider/WorkerAliveness.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public struct WorkerAliveness: Equatable, CustomStringConvertible {
public enum Status: Equatable, CustomStringConvertible {
case alive
case silent(lastAlivenessResponseTimestamp: Date)
case blocked
case notRegistered

public var description: String {
Expand All @@ -14,8 +13,6 @@ public struct WorkerAliveness: Equatable, CustomStringConvertible {
return "alive"
case .silent(let lastAlivenessResponseTimestamp):
return "silent since \(lastAlivenessResponseTimestamp)"
case .blocked:
return "blocked"
case .notRegistered:
return "not registered"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public protocol WorkerAlivenessProvider: class {
func didDequeueBucket(bucketId: BucketId, workerId: WorkerId)

func didRegisterWorker(workerId: WorkerId)
func blockWorker(workerId: WorkerId)
}

public extension WorkerAlivenessProvider {
Expand Down
25 changes: 4 additions & 21 deletions Sources/WorkerAlivenessProvider/WorkerAlivenessProviderImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider {
private let knownWorkerIds: Set<WorkerId>
private var workerAliveReportTimestamps = [WorkerId: Date]()
private let workerBucketIdsBeingProcessed = WorkerCurrentlyProcessingBucketsTracker()
private var blockedWorkers = Set<WorkerId>()
/// allow worker some additinal time to perform a "i'm alive" report, e.g. to compensate a network latency
private let maximumNotReportingDuration: TimeInterval

Expand All @@ -34,10 +33,8 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider {

public func set(bucketIdsBeingProcessed: Set<BucketId>, workerId: WorkerId) {
syncQueue.sync {
if !blockedWorkers.contains(workerId) {
onSyncQueue_markWorkerAsAlive(workerId: workerId)
workerBucketIdsBeingProcessed.set(bucketIdsBeingProcessed: bucketIdsBeingProcessed, byWorkerId: workerId)
}
onSyncQueue_markWorkerAsAlive(workerId: workerId)
workerBucketIdsBeingProcessed.set(bucketIdsBeingProcessed: bucketIdsBeingProcessed, byWorkerId: workerId)
}
}

Expand All @@ -46,18 +43,7 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider {
onSyncQueue_markWorkerAsAlive(workerId: workerId)
}
}

public func blockWorker(workerId: WorkerId) {
syncQueue.sync {
_ = blockedWorkers.insert(workerId)
workerBucketIdsBeingProcessed.resetBucketIdsBeingProcessedBy(workerId: workerId)
Logger.warning("Blocked worker: \(workerId)")

let workerAliveness = onSyncQueue_workerAliveness()
Logger.debug("Alive workers: \(workerAliveness.filter { $0.value.status == .alive }), blocked workers: \(workerAliveness.filter { $0.value.status == .blocked })")
}
}


public var workerAliveness: [WorkerId: WorkerAliveness] {
return syncQueue.sync {
onSyncQueue_workerAliveness()
Expand All @@ -71,7 +57,7 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider {
}

private func onSyncQueue_workerAliveness() -> [WorkerId: WorkerAliveness] {
let uniqueWorkerIds = Set<WorkerId>(workerAliveReportTimestamps.keys).union(blockedWorkers).union(knownWorkerIds)
let uniqueWorkerIds = Set<WorkerId>(workerAliveReportTimestamps.keys).union(knownWorkerIds)

var workerAliveness = [WorkerId: WorkerAliveness]()
let currentDate = Date()
Expand All @@ -85,9 +71,6 @@ public final class WorkerAlivenessProviderImpl: WorkerAlivenessProvider {
guard let latestAliveDate = workerAliveReportTimestamps[workerId] else {
return WorkerAliveness(status: .notRegistered, bucketIdsBeingProcessed: [])
}
if blockedWorkers.contains(workerId) {
return WorkerAliveness(status: .blocked, bucketIdsBeingProcessed: [])
}

let bucketIdsBeingProcessed = workerBucketIdsBeingProcessed.bucketIdsBeingProcessedBy(workerId: workerId)
let silenceDuration = currentDate.timeIntervalSince(latestAliveDate)
Expand Down
15 changes: 0 additions & 15 deletions Tests/BalancingBucketQueueTests/BalancingBucketQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -392,21 +392,6 @@ final class BalancingBucketQueueTests: XCTestCase {
)
}

func test___dequeueing_by_blocked_worker___when_buckets_enqueued___provides_blocked_response() throws {
workerAlivenessProvider.workerAliveness[workerId] = WorkerAliveness(
status: .blocked,
bucketIdsBeingProcessed: []
)

let bucket = BucketFixtures.createBucket(testEntries: [TestEntryFixtures.testEntry()])
balancingQueue.enqueue(buckets: [bucket], prioritizedJob: prioritizedJob)

XCTAssertEqual(
balancingQueue.dequeueBucket(requestId: requestId, workerId: workerId),
DequeueResult.workerIsBlocked
)
}

let dateProvider = DateProviderFixture()
let uniqueIdentifierGenerator = FixedValueUniqueIdentifierGenerator()
let workerAlivenessProvider = MutableWorkerAlivenessProvider()
Expand Down
26 changes: 0 additions & 26 deletions Tests/BucketQueueTests/BucketQueueTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,7 @@ final class BucketQueueTests: XCTestCase {
XCTFail("Expected dequeueResult == .checkAgainLater, got: \(dequeueResult)")
}
}

func test__reponse_workerBlocked__when_worker_is_blocked() {
alivenessTrackerWithAlwaysAliveResults.blockWorker(workerId: workerId)
let bucketQueue = BucketQueueFixtures.bucketQueue(workerAlivenessProvider: alivenessTrackerWithAlwaysAliveResults)

let bucket = BucketFixtures.createBucket(testEntries: [])
bucketQueue.enqueue(buckets: [bucket])

let dequeueResult = bucketQueue.dequeueBucket(requestId: requestId, workerId: workerId)
XCTAssertEqual(dequeueResult, .workerIsBlocked)
}

func test__reponse_workerIsNotAlive__when_worker_is_not_alive() {
let bucketQueue = BucketQueueFixtures.bucketQueue(workerAlivenessProvider: alivenessTrackerWithAlwaysAliveResults)
let dequeueResult = bucketQueue.dequeueBucket(
Expand Down Expand Up @@ -230,22 +219,7 @@ final class BucketQueueTests: XCTestCase {
[StuckBucket(reason: .workerIsSilent(since: silentSince), bucket: bucket, workerId: workerId, requestId: requestId)]
)
}

func test__when_worker_is_blocked__its_dequeued_buckets_removed() {
let bucket = BucketFixtures.createBucket(testEntries: [])

let bucketQueue = BucketQueueFixtures.bucketQueue(workerAlivenessProvider: alivenessTrackerWithAlwaysAliveResults)
bucketQueue.enqueue(buckets: [bucket])
_ = bucketQueue.dequeueBucket(requestId: requestId, workerId: workerId)

alivenessTrackerWithAlwaysAliveResults.blockWorker(workerId: workerId)
let stuckBuckets = bucketQueue.reenqueueStuckBuckets()
XCTAssertEqual(
stuckBuckets,
[StuckBucket(reason: .workerIsBlocked, bucket: bucket, workerId: workerId, requestId: requestId)]
)
}

func test___when_worker_loses_bucket___it_is_removed_as_stuck() {
let bucket = BucketFixtures.createBucket(testEntries: [])

Expand Down
5 changes: 0 additions & 5 deletions Tests/QueueClientTests/FakeQueueClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ class FakeQueueClientDelegate: QueueClientDelegate {
case queueIsEmpty
case checkAfter(TimeInterval)
case bucket(Bucket)
case workerHasBeenBlocked
case workerConsideredNotAlive
case didScheduleTests(RequestId)
case fetchedJobState(JobState)
Expand All @@ -32,10 +31,6 @@ class FakeQueueClientDelegate: QueueClientDelegate {
responses.append(ServerResponse.workerConsideredNotAlive)
}

func queueClientWorkerHasBeenBlocked(_ sender: QueueClient) {
responses.append(ServerResponse.workerHasBeenBlocked)
}

func queueClient(_ sender: QueueClient, fetchBucketLaterAfter after: TimeInterval) {
responses.append(ServerResponse.checkAfter(after))
}
Expand Down
15 changes: 0 additions & 15 deletions Tests/QueueServerTests/WorkerRegistrarTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,6 @@ final class WorkerRegistrarTests: XCTestCase {
XCTAssertEqual(alivenessTracker.alivenessForWorker(workerId: workerId).status, .alive)
}

func test___registration_for_blocked_worker__throws() throws {
let registrar = createRegistrar()
alivenessTracker.didRegisterWorker(workerId: workerId)
alivenessTracker.blockWorker(workerId: workerId)

assertThrows {
_ = try registrar.handle(
decodedPayload: RegisterWorkerPayload(
workerId: workerId,
workerRestAddress: SocketAddress(host: "host", port: 0)
)
)
}
}

func test_successful_registration() throws {
let registrar = createRegistrar()

Expand Down
Loading

0 comments on commit 6abc4ed

Please sign in to comment.