Skip to content

Commit

Permalink
Use dedicated type for queue port
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Ignatov committed Jun 16, 2020
1 parent a8fc4e1 commit 1cd7a73
Show file tree
Hide file tree
Showing 36 changed files with 131 additions and 73 deletions.
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ let package = Package(
name: "PortDeterminer",
dependencies: [
"Logging",
"Models",
"Swifter",
],
path: "Sources/PortDeterminer"
Expand All @@ -988,6 +989,7 @@ let package = Package(
// MARK: PortDeterminerTests
name: "PortDeterminerTests",
dependencies: [
"Models",
"PortDeterminer",
"Swifter",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public final class RunTestsOnRemoteQueueCommand: Command {
return try queueClient.jobResults(jobId: jobId)
}

private func selectPort(ports: Set<Int>) throws -> Int {
private func selectPort(ports: Set<Models.Port>) throws -> Models.Port {
enum PortScanningError: Error, CustomStringConvertible {
case noQueuePortDetected

Expand Down
4 changes: 2 additions & 2 deletions Sources/EmceeLib/Utils/Ports.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Foundation
import Models

public final class Ports {
/// TCP ports that are expecred to be taken by the queue server.
/// This port range appears to be unassigned according to IANA
/// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?&page=131
public static let defaultQueuePortRange = 41000...41010
public static let defaultQueuePortRange: ClosedRange<Port> = 41000...41010
}
3 changes: 1 addition & 2 deletions Sources/LocalQueueServerRunner/LocalQueueServerError.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import Foundation
import Models

public enum LocalQueueServerError: Error, CustomStringConvertible {
case sameVersionQueueIsAlreadyRunning(port: Int, version: Version)
case sameVersionQueueIsAlreadyRunning(port: Port, version: Version)

public var description: String {
switch self {
Expand Down
6 changes: 3 additions & 3 deletions Sources/LocalQueueServerRunner/LocalQueueServerRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public final class LocalQueueServerRunner {
)
}

private func startQueueServer(emceeVersion: Version) throws -> Int {
private func startQueueServer(emceeVersion: Version) throws -> Models.Port {
let lockToStartQueueServer = try FileLock.named("emcee_starting_queue_server_\(emceeVersion.value)")
return try lockToStartQueueServer.whileLocked {
try ensureQueueWithMatchingVersionIsNotRunning(version: emceeVersion)
Expand All @@ -93,14 +93,14 @@ public final class LocalQueueServerRunner {
private func ensureQueueWithMatchingVersionIsNotRunning(version: Version) throws {
let portToQueueServerVersion = remotePortDeterminer.queryPortAndQueueServerVersion(timeout: 10)

try portToQueueServerVersion.forEach { (item: (key: Int, value: Version)) in
try portToQueueServerVersion.forEach { (item: (key: Models.Port, value: Version)) in
if item.value == version {
throw LocalQueueServerError.sameVersionQueueIsAlreadyRunning(port: item.key, version: version)
}
}
}

private func startWorkers(emceeVersion: Version, port: Int) throws {
private func startWorkers(emceeVersion: Version, port: Models.Port) throws {
Logger.info("Deploying and starting workers in background")

let remoteWorkersStarter = RemoteWorkersStarter(
Expand Down
2 changes: 1 addition & 1 deletion Sources/LoggingSetup/GraphiteMetricHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public final class GraphiteMetricHandler: MetricHandler {
outputStream = EasyOutputStream(
outputStreamProvider: NetworkSocketOutputStreamProvider(
host: graphiteSocketAddress.host,
port: graphiteSocketAddress.port
port: graphiteSocketAddress.port.value
),
errorHandler: { stream, error in
Logger.error("Graphite stream error: \(error)")
Expand Down
41 changes: 41 additions & 0 deletions Sources/Models/NewIntType.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Foundation

open class NewIntType: ExpressibleByIntegerLiteral, Codable, Hashable, CustomStringConvertible, Comparable {
public typealias IntegerLiteralType = Int

public let value: Int

public init(value: Int) {
self.value = value
}

public required init(integerLiteral value: Int) {
self.value = value
}

public var description: String {
return "\(value)"
}

public func hash(into hasher: inout Hasher) {
hasher.combine(value)
}

public static func ==(left: NewIntType, right: NewIntType) -> Bool {
return left.value == right.value
}

public required init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
value = try container.decode(Int.self)
}

public func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
try container.encode(value)
}

public static func < (left: NewIntType, right: NewIntType) -> Bool {
return left.value < right.value
}
}
11 changes: 11 additions & 0 deletions Sources/Models/Port.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
public final class Port: NewIntType, Strideable {
public typealias Stride = Int

public func distance(to other: Port) -> Int {
other.value - self.value
}

public func advanced(by n: Int) -> Self {
return type(of: self).init(value: value + n)
}
}
8 changes: 4 additions & 4 deletions Sources/Models/SocketAddress.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Foundation

public struct SocketAddress: Codable, CustomStringConvertible, Hashable {
public let host: String
public let port: Int
public let port: Port

public enum ParseError: Error, CustomStringConvertible {
case unsupportedFormat(String)
Expand All @@ -15,7 +15,7 @@ public struct SocketAddress: Codable, CustomStringConvertible, Hashable {
}
}

public init(host: String, port: Int) {
public init(host: String, port: Port) {
self.host = host
self.port = port
}
Expand All @@ -27,7 +27,7 @@ public struct SocketAddress: Codable, CustomStringConvertible, Hashable {
}
return SocketAddress(
host: String(components[0]),
port: port
port: Port(value: port)
)
}

Expand All @@ -45,7 +45,7 @@ public struct SocketAddress: Codable, CustomStringConvertible, Hashable {
}

public var asString: String {
return "\(host):\(port)"
return "\(host):\(port.value)"
}

public var description: String {
Expand Down
11 changes: 6 additions & 5 deletions Sources/PortDeterminer/LocalPortDeterminer.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import Darwin
import Foundation
import Logging
import Models
import Swifter

public final class LocalPortDeterminer {
private let portRange: ClosedRange<Int>
private let portRange: ClosedRange<Models.Port>

public init(portRange: ClosedRange<Int>) {
public init(portRange: ClosedRange<Models.Port>) {
self.portRange = portRange
}

public enum LocalPortDeterminerError: Error, CustomStringConvertible {
case noAvailablePorts(portRange: ClosedRange<Int>)
case noAvailablePorts(portRange: ClosedRange<Models.Port>)

public var description: String {
switch self {
Expand All @@ -21,10 +22,10 @@ public final class LocalPortDeterminer {
}
}

public func availableLocalPort() throws -> Int {
public func availableLocalPort() throws -> Models.Port {
for port in portRange {
Logger.debug("Checking availability of local port \(port)")
if isPortAvailable(port: UInt16(port)) {
if isPortAvailable(port: UInt16(port.value)) {
Logger.debug("Port \(port) appears to be available")
return port
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueueClient/QueueClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public final class QueueClient {
var components = URLComponents()
components.scheme = "http"
components.host = queueServerAddress.host
components.port = queueServerAddress.port
components.port = queueServerAddress.port.value
components.path = restMethod.pathWithLeadingSlash
guard let url = components.url else {
Logger.fatal("Unable to convert components to url: \(components)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DefaultQueueCommunicationService: QueueCommunicationService {
}

public func deploymentDestinations(
port: Int,
port: Models.Port,
completion: @escaping (Either<[DeploymentDestination], Error>) -> ()
) {
let requestSender = requestSenderProvider.requestSender(
Expand Down
4 changes: 2 additions & 2 deletions Sources/QueueCommunication/DefaultRemoteQueueDetector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ public final class DefaultRemoteQueueDetector: RemoteQueueDetector {
self.remotePortDeterminer = remotePortDeterminer
}

public func findSuitableRemoteRunningQueuePorts(timeout: TimeInterval) throws -> Set<Int> {
public func findSuitableRemoteRunningQueuePorts(timeout: TimeInterval) throws -> Set<Models.Port> {
let availableQueues = remotePortDeterminer.queryPortAndQueueServerVersion(timeout: timeout)
let ports = availableQueues
.filter { keyValue -> Bool in keyValue.value == emceeVersion }
.map { $0.key }
return Set(ports)
}

public func findMasterQueuePort(timeout: TimeInterval) throws -> Int {
public func findMasterQueuePort(timeout: TimeInterval) throws -> Models.Port {
let availableQueues = remotePortDeterminer.queryPortAndQueueServerVersion(timeout: timeout)

let sortedQueues = availableQueues
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueueCommunication/QueueCommunicationService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public protocol QueueCommunicationService {
)

func deploymentDestinations(
port: Int,
port: Port,
completion: @escaping (Either<[DeploymentDestination], Error>) -> ()
)
}
5 changes: 3 additions & 2 deletions Sources/QueueCommunication/RemoteQueueDetector.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Foundation
import Models

public protocol RemoteQueueDetector {
func findSuitableRemoteRunningQueuePorts(timeout: TimeInterval) throws -> Set<Int>
func findMasterQueuePort(timeout: TimeInterval) throws -> Int
func findSuitableRemoteRunningQueuePorts(timeout: TimeInterval) throws -> Set<Models.Port>
func findMasterQueuePort(timeout: TimeInterval) throws -> Models.Port
}
4 changes: 2 additions & 2 deletions Sources/QueueServer/LocalPortDeterminer+PortProvider.swift
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Foundation
import Models
import PortDeterminer
import RESTServer

extension LocalPortDeterminer: PortProvider {
public func localPort() throws -> Int {
public func localPort() throws -> Port {
return try availableLocalPort()
}
}
2 changes: 1 addition & 1 deletion Sources/QueueServer/QueueServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Models
import QueueModels

public protocol QueueServer {
func start() throws -> Int
func start() throws -> Port
func schedule(
bucketSplitter: BucketSplitter,
testEntryConfigurations: [TestEntryConfiguration],
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueueServer/QueueServerImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public final class QueueServerImpl: QueueServer {
self.toggleWorkersSharingEndpoint = ToggleWorkersSharingEndpoint(poller: workerUtilizationStatusPoller)
}

public func start() throws -> Int {
public func start() throws -> Models.Port {
httpRestServer.add(handler: RESTEndpointOf(bucketProvider))
httpRestServer.add(handler: RESTEndpointOf(bucketResultRegistrar))
httpRestServer.add(handler: RESTEndpointOf(deploymentDestinationsHandler))
Expand Down
7 changes: 4 additions & 3 deletions Sources/RESTServer/HTTPRESTServer.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import AutomaticTermination
import Foundation
import Logging
import Models
import RESTMethods
import Swifter

Expand Down Expand Up @@ -46,12 +47,12 @@ public final class HTTPRESTServer {
}
}

public func start() throws -> Int {
public func start() throws -> Models.Port {
let port = try portProvider.localPort()
try server.start(in_port_t(port), forceIPv4: false, priority: .default)
try server.start(in_port_t(port.value), forceIPv4: false, priority: .default)

let actualPort = try server.port()
Logger.debug("Started REST server on \(actualPort) port")
return actualPort
return Port(value: actualPort)
}
}
4 changes: 2 additions & 2 deletions Sources/RESTServer/PortProvider.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Foundation
import Models

public protocol PortProvider {
func localPort() throws -> Int
func localPort() throws -> Port
}

8 changes: 4 additions & 4 deletions Sources/RESTServer/PortProviderWrapper.swift
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import Foundation
import Models

public final class PortProviderWrapper: PortProvider {
private let provider: () throws -> Int
private let provider: () throws -> Port

public init(provider: @escaping () throws -> Int) {
public init(provider: @escaping () throws -> Port) {
self.provider = provider
}

public func localPort() throws -> Int {
public func localPort() throws -> Port {
return try provider()
}
}
2 changes: 1 addition & 1 deletion Sources/RemotePortDeterminer/RemotePortDeterminer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ import Foundation
import Models

public protocol RemotePortDeterminer {
func queryPortAndQueueServerVersion(timeout: TimeInterval) -> [Int: Version]
func queryPortAndQueueServerVersion(timeout: TimeInterval) -> [Models.Port: Version]
}
8 changes: 4 additions & 4 deletions Sources/RemotePortDeterminer/RemoteQueuePortScanner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ import RequestSender

public final class RemoteQueuePortScanner: RemotePortDeterminer {
private let host: String
private let portRange: ClosedRange<Int>
private let portRange: ClosedRange<Models.Port>
private let requestSenderProvider: RequestSenderProvider
private let workQueue = DispatchQueue(label: "RemoteQueuePortScanner.workQueue")

public init(
host: String,
portRange: ClosedRange<Int>,
portRange: ClosedRange<Models.Port>,
requestSenderProvider: RequestSenderProvider
) {
self.host = host
self.portRange = portRange
self.requestSenderProvider = requestSenderProvider
}

public func queryPortAndQueueServerVersion(timeout: TimeInterval) -> [Int: Version] {
public func queryPortAndQueueServerVersion(timeout: TimeInterval) -> [Models.Port: Version] {
let group = DispatchGroup()

let portToVersion = AtomicValue<[Int: Version]>([:])
let portToVersion = AtomicValue<[Models.Port: Version]>([:])

for port in portRange {
group.enter()
Expand Down
2 changes: 1 addition & 1 deletion Sources/RequestSender/RequestSenderImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public final class RequestSenderImpl: RequestSender {
var components = URLComponents()
components.scheme = "http"
components.host = queueServerAddress.host
components.port = queueServerAddress.port
components.port = queueServerAddress.port.value
components.path = pathWithSlash
guard let url = components.url else {
throw RequestSenderError.unableToCreateUrl(components)
Expand Down
Loading

0 comments on commit 1cd7a73

Please sign in to comment.