Skip to content

Commit

Permalink
fix: disconnect called before connection has completed #107 (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian authored Aug 2, 2023
1 parent 6350a5b commit 005f0d5
Showing 1 changed file with 29 additions and 16 deletions.
45 changes: 29 additions & 16 deletions src/lib/atemSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { CommandParser } from './atemCommandParser'
import exitHook = require('exit-hook')
import { VersionCommand, ISerializableCommand, IDeserializedCommand } from '../commands'
import { DEFAULT_PORT } from '../atem'
import { threadedClass, ThreadedClass, ThreadedClassManager, Promisify } from 'threadedclass'
import { threadedClass, ThreadedClass, ThreadedClassManager } from 'threadedclass'
import type { AtemSocketChild } from './atemSocketChild'

export interface AtemSocketOptions {
Expand All @@ -30,9 +30,11 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
private readonly _commandParser: CommandParser = new CommandParser()

private _nextCommandTrackingId = 0
private _isDisconnecting = false
private _address: string
private _port: number = DEFAULT_PORT
private _socketProcess: ThreadedClass<AtemSocketChild> | undefined
private _creatingSocket: Promise<void> | undefined
private _exitUnsubscribe?: () => void

constructor(options: AtemSocketOptions) {
Expand All @@ -45,6 +47,8 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
}

public async connect(address?: string, port?: number): Promise<void> {
this._isDisconnecting = false

if (address) {
this._address = address
}
Expand All @@ -53,17 +57,24 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
}

if (!this._socketProcess) {
this._socketProcess = await this._createSocketProcess()
this._exitUnsubscribe = exitHook(() => {
this.destroy().catch(() => null)
})
} else {
await this._socketProcess.connect(this._address, this._port)
// cache the creation promise, in case `destroy` is called before it completes
this._creatingSocket = this._createSocketProcess()
await this._creatingSocket

if (this._isDisconnecting || !this._socketProcess) {
throw new Error('Disconnecting')
}
}

await this._socketProcess.connect(this._address, this._port)
}

public async destroy(): Promise<void> {
await this.disconnect()

// Ensure thread creation has finished if it was started
if (this._creatingSocket) await this._creatingSocket.catch(() => null)

if (this._socketProcess) {
await ThreadedClassManager.destroy(this._socketProcess)
this._socketProcess = undefined
Expand All @@ -75,6 +86,8 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
}

public async disconnect(): Promise<void> {
this._isDisconnecting = true

if (this._socketProcess) {
await this._socketProcess.disconnect()
}
Expand All @@ -91,7 +104,7 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
commands: Array<{ rawCommand: ISerializableCommand; trackingId: number }>
): Promise<void> {
if (this._socketProcess) {
const commands2 = commands.map((cmd) => {
const wrappedCommands = commands.map((cmd) => {
if (typeof cmd.rawCommand.serialize !== 'function') {
throw new Error(`Command ${cmd.rawCommand.constructor.name} is not serializable`)
}
Expand All @@ -107,14 +120,14 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
}
})

await this._socketProcess.sendCommands(commands2)
await this._socketProcess.sendCommands(wrappedCommands)
} else {
throw new Error('Socket process is not open')
}
}

private async _createSocketProcess(): Promise<Promisify<AtemSocketChild>> {
const socketProcess = await threadedClass<AtemSocketChild, typeof AtemSocketChild>(
private async _createSocketProcess(): Promise<void> {
this._socketProcess = await threadedClass<AtemSocketChild, typeof AtemSocketChild>(
'./atemSocketChild',
'AtemSocketChild',
[
Expand Down Expand Up @@ -147,19 +160,19 @@ export class AtemSocket extends EventEmitter<AtemSocketEvents> {
}
)

ThreadedClassManager.onEvent(socketProcess, 'restarted', () => {
ThreadedClassManager.onEvent(this._socketProcess, 'restarted', () => {
this.connect().catch((error) => {
const errorMsg = `Failed to reconnect after respawning socket process: ${error}`
this.emit('error', errorMsg)
})
})
ThreadedClassManager.onEvent(socketProcess, 'thread_closed', () => {
ThreadedClassManager.onEvent(this._socketProcess, 'thread_closed', () => {
this.emit('disconnect')
})

await socketProcess.connect(this._address, this._port)

return socketProcess
this._exitUnsubscribe = exitHook(() => {
this.destroy().catch(() => null)
})
}

private _parseCommands(buffer: Buffer): IDeserializedCommand[] {
Expand Down

0 comments on commit 005f0d5

Please sign in to comment.