Skip to content

Commit

Permalink
feat: add initial EventSub implementation (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
LosFarmosCTL authored Jul 11, 2024
1 parent c73ce48 commit d49aee6
Show file tree
Hide file tree
Showing 69 changed files with 879 additions and 360 deletions.
106 changes: 106 additions & 0 deletions Sources/Twitch/EventSub/EventSubClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

private typealias EventID = String
private typealias SocketID = String

internal class EventSubClient {
private static let eventSubURL = URL(string: "wss://eventsub.wss.twitch.tv/ws")!
private static let maxSubscriptionsPerConnection = 300

private let credentials: TwitchCredentials
private let urlSession: URLSession
private let decoder: JSONDecoder

private var connections = [SocketID: EventSubConnection]()
private var connectionEvents = [SocketID: [EventID]]()
private var eventHandlers = [EventID: EventSubHandler]()

internal init(
credentials: TwitchCredentials, urlSession: URLSession, decoder: JSONDecoder
) {
self.credentials = credentials
self.urlSession = urlSession
self.decoder = decoder
}

internal func addHandler(
_ handler: EventSubHandler, for eventID: String, on socketID: String
) {
eventHandlers[eventID] = handler

connectionEvents[socketID, default: []].append(eventID)
}

internal func getFreeWebsocketID() async throws -> String {
for (socketID, events) in connectionEvents
where events.count < Self.maxSubscriptionsPerConnection {
return socketID
}

return try await createConnection()
}

private func createConnection(url: URL = eventSubURL) async throws -> SocketID {
let connection = EventSubConnection(
credentials: credentials, urlSession: urlSession, decoder: decoder,
eventSubURL: url, onMessage: receiveMessage(_:))

let socketID = try await connection.resume()

connections[socketID] = connection
return socketID
}

private func receiveMessage(
_ result: Result<EventSubNotification, EventSubConnectionError>
) {
switch result {
case .success(let notification):
eventHandlers[notification.subscription.id]?.yield(notification.event)

case .failure(let error):
switch error {
case .revocation(let revocation):
eventHandlers[revocation.subscriptionID]?.finish(
throwing: .revocation(revocation))
case .reconnectRequested(let reconnectURL, let socketID):
self.reconnect(socketID, reconnectURL: reconnectURL)
case .disconnected(let error, let socketID):
finishConnection(socketID, throwing: .disconnected(with: error))
case .timedOut(let socketID):
finishConnection(socketID, throwing: .timedOut)
}
}
}

private func reconnect(_ socketID: SocketID, reconnectURL: URL) {
Task {
do {
let newSocketID = try await self.createConnection()

// move all events to the new connection
connectionEvents[newSocketID] = connectionEvents[socketID]

connections.removeValue(forKey: socketID)
connectionEvents.removeValue(forKey: socketID)
} catch {
finishConnection(socketID, throwing: .disconnected(with: error))
}
}
}

private func finishConnection(_ socketID: SocketID, throwing error: EventSubError) {
for (socket, events) in connectionEvents where socket == socketID {
events.forEach { event in
eventHandlers[event]?.finish(throwing: error)
}
}

connectionEvents.removeValue(forKey: socketID)
connections.removeValue(forKey: socketID)
}
}
137 changes: 137 additions & 0 deletions Sources/Twitch/EventSub/EventSubConnection.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import Foundation

#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

internal class EventSubConnection {
private let eventSubURL: URL

private let credentials: TwitchCredentials
private let urlSession: URLSession
private let decoder: JSONDecoder

private var keepaliveTimer: KeepaliveTimer?
private var websocket: URLSessionWebSocketTask?
private var socketID: String?

private var onMessage: ((Result<EventSubNotification, EventSubConnectionError>) -> Void)
private var welcomeContinuation: CheckedContinuation<EventSubWelcome, any Error>?

private var receivedMessageIDs = [String]()

// TODO: look into deinitilizations
deinit {
self.websocket?.cancel(with: .goingAway, reason: nil)
}

init(
credentials: TwitchCredentials, urlSession: URLSession, decoder: JSONDecoder,
eventSubURL: URL,
onMessage: @escaping (Result<EventSubNotification, EventSubConnectionError>) -> Void
) {
self.credentials = credentials
self.urlSession = urlSession
self.decoder = decoder

self.eventSubURL = eventSubURL

self.onMessage = onMessage
}

internal func resume() async throws -> String {
if let socketID = self.socketID { return socketID }

self.websocket = urlSession.webSocketTask(with: eventSubURL)
self.websocket?.receive(completionHandler: receiveMessage(_:))
self.websocket?.resume()

// Twitch sends keepalive messages in a specified time interval,
// if we don't receive a message within that interval, we should
// consider the connection to be timed out
self.keepaliveTimer = KeepaliveTimer(duration: .seconds(10)) {
self.onMessage(
.failure(EventSubConnectionError.timedOut(socketID: self.socketID ?? "")))
self.websocket?.cancel()
}

// wait for the welcome message to be received
let welcomeMessage = try await withCheckedThrowingContinuation { continuation in
self.welcomeContinuation = continuation
}

// use a slightly longer keepalive timeout to account for network latency
let timeout = welcomeMessage.keepaliveTimeout + .seconds(1)
await self.keepaliveTimer?.reset(duration: timeout)

self.socketID = welcomeMessage.sessionID
return welcomeMessage.sessionID
}

private func receiveMessage(
_ result: Result<URLSessionWebSocketTask.Message, any Error>
) {
switch result {
case .success(let message):
// reset the keepalive timer on every message
Task { await self.keepaliveTimer?.reset() }

if let message = parseMessage(message) {

// ignore duplicate messages
if !receivedMessageIDs.contains(message.id) {
handleMessage(message)
receivedMessageIDs.append(message.id)

// only keep the last 100 message IDs
if receivedMessageIDs.count > 100 {
receivedMessageIDs.removeFirst()
}
}
}

// recursively receive the next message
self.websocket?.receive(completionHandler: receiveMessage)
case .failure(let error):
let disconnectedError = EventSubConnectionError.disconnected(
with: error, socketID: socketID ?? "")

if let welcomeContinuation {
welcomeContinuation.resume(throwing: disconnectedError)
self.welcomeContinuation = nil
}

onMessage(.failure(disconnectedError))
}
}

private func parseMessage(_ message: URLSessionWebSocketTask.Message)
-> EventSubMessage?
{
switch message {
case .string(let string):
return try? decoder.decode(EventSubMessage.self, from: Data(string.utf8))
// ignore binary messages, Twitch only sends JSON
case .data: return nil
@unknown default: return nil
}
}

private func handleMessage(_ message: EventSubMessage) {
switch message.payload {
case .keepalive: break // nothing to do for keepalive messages
case .welcome(let welcome):
welcomeContinuation?.resume(returning: welcome)
welcomeContinuation = nil
case .notification(let notification):
onMessage(.success(notification))
case .revocation(let revocation):
onMessage(.failure(EventSubConnectionError.revocation(revocation)))
case .reconnect(let reconnect):
onMessage(
.failure(
EventSubConnectionError.reconnectRequested(
reconnectURL: reconnect.reconnectURL, socketID: socketID ?? "")))
}
}
}
14 changes: 14 additions & 0 deletions Sources/Twitch/EventSub/EventSubError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import Foundation

public enum EventSubError: Error {
case revocation(EventSubRevocation)
case disconnected(with: Error)
case timedOut
}

internal enum EventSubConnectionError: Error {
case revocation(EventSubRevocation)
case reconnectRequested(reconnectURL: URL, socketID: String)
case timedOut(socketID: String)
case disconnected(with: Error, socketID: String)
}
50 changes: 50 additions & 0 deletions Sources/Twitch/EventSub/EventSubHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
internal protocol EventSubHandler {
func yield(_ event: Event)
func finish(throwing error: EventSubError)
}

internal struct EventSubCallbackHandler<T>: EventSubHandler {
var callback: (Result<T, EventSubError>) -> Void

func yield(_ event: Event) {
if let event = event as? T {
callback(.success(event))
}
}

func finish(throwing error: EventSubError) {
callback(.failure(error))
}
}

internal struct EventSubContinuationHandler<T>: EventSubHandler {
var continuation: AsyncThrowingStream<T, any Error>.Continuation

func yield(_ event: Event) {
if let event = event as? T {
continuation.yield(event)
}
}

func finish(throwing error: EventSubError) {
continuation.finish(throwing: error)
}
}

#if canImport(Combine)
import Combine

internal struct EventSubSubjectHandler<T>: EventSubHandler {
var subject: PassthroughSubject<T, EventSubError>

func yield(_ event: Event) {
if let event = event as? T {
subject.send(event)
}
}

func finish(throwing error: EventSubError) {
subject.send(completion: .failure(error))
}
}
#endif
14 changes: 0 additions & 14 deletions Sources/Twitch/EventSub/EventSubSubscriptionType.swift

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
public struct ChannelFollowEvent: Event {
public let userID: String
public let userName: String
public let userDisplayName: String
public let broadcasterID: String
public let broadcasterName: String
public let broadcasterDisplayName: String
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
public struct ChannelUpdateEvent: Event {

}
11 changes: 11 additions & 0 deletions Sources/Twitch/EventSub/Events/Chat/ChatClearEvent.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
public struct ChatClearEvent: Event {
public let broadcasterID: String
public let broadcasterLogin: String
public let broadcasterName: String

enum CodingKeys: String, CodingKey {
case broadcasterID = "broadcasterUserId"
case broadcasterLogin = "broadcasterUserLogin"
case broadcasterName = "broadcasterUserName"
}
}
3 changes: 3 additions & 0 deletions Sources/Twitch/EventSub/Events/Chat/ChatMessageEvent.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
public struct ChatMessageEvent: Event {
let chatterUserLogin: String
}
17 changes: 17 additions & 0 deletions Sources/Twitch/EventSub/Events/Event.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
public protocol Event: Decodable {}

internal enum EventType: String, Decodable {
case channelFollow = "channel.follow"
case chatMessage = "channel.chat.message"
case channelUpdate = "channel.update"
case chatClear = "channel.chat.clear"

var event: Event.Type {
switch self {
case .channelFollow: return ChannelFollowEvent.self
case .chatMessage: return ChatMessageEvent.self
case .channelUpdate: return ChannelUpdateEvent.self
case .chatClear: return ChatClearEvent.self
}
}
}
Loading

0 comments on commit d49aee6

Please sign in to comment.