From ab98953b69544107d840d0490ce7d2dd6434aad7 Mon Sep 17 00:00:00 2001 From: Simon Chan <1330321+yume-chan@users.noreply.github.com> Date: Mon, 30 Dec 2024 16:40:23 +0800 Subject: [PATCH] feat(adb): add partial support for ADB server version 40 --- .changeset/cuddly-dolls-wonder.md | 5 + apps/cli/src/index.ts | 6 +- libraries/adb/src/server/client.ts | 258 +++++++++----------------- libraries/adb/src/server/index.ts | 2 + libraries/adb/src/server/observer.ts | 144 ++++++++++++++ libraries/adb/src/server/stream.ts | 100 ++++++++++ libraries/adb/src/server/transport.ts | 30 +-- libraries/adb/src/utils/index.ts | 1 + libraries/adb/src/utils/ref.ts | 37 ++++ 9 files changed, 396 insertions(+), 187 deletions(-) create mode 100644 .changeset/cuddly-dolls-wonder.md create mode 100644 libraries/adb/src/server/observer.ts create mode 100644 libraries/adb/src/server/stream.ts create mode 100644 libraries/adb/src/utils/ref.ts diff --git a/.changeset/cuddly-dolls-wonder.md b/.changeset/cuddly-dolls-wonder.md new file mode 100644 index 000000000..ab6e0b8cb --- /dev/null +++ b/.changeset/cuddly-dolls-wonder.md @@ -0,0 +1,5 @@ +--- +"@yume-chan/adb": minor +--- + +Add partial support for ADB server version 40 diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 2fb34a25c..3fa18eca7 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -4,7 +4,7 @@ import "source-map-support/register.js"; -import { Adb, AdbServerClient } from "@yume-chan/adb"; +import { Adb, AdbServerClient, Ref } from "@yume-chan/adb"; import { AdbServerNodeTcpConnector } from "@yume-chan/adb-server-node-tcp"; import { WritableStream } from "@yume-chan/stream-extra"; import { program } from "commander"; @@ -132,6 +132,8 @@ createDeviceCommand("shell [args...]") ) .configureHelp({ showGlobalOptions: true }) .action(async (args: string[], options: DeviceCommandOptions) => { + const ref = new Ref(); + const adb = await createAdb(options); const shell = await adb.subprocess.shell(args); @@ -169,6 +171,8 @@ createDeviceCommand("shell [args...]") process.exit(1); }, ); + + ref.unref(); }); createDeviceCommand("logcat [args...]") diff --git a/libraries/adb/src/server/client.ts b/libraries/adb/src/server/client.ts index a2819bdbf..670ab9829 100644 --- a/libraries/adb/src/server/client.ts +++ b/libraries/adb/src/server/client.ts @@ -3,123 +3,28 @@ import type { MaybePromiseLike } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async"; import type { Event } from "@yume-chan/event"; -import { EventEmitter } from "@yume-chan/event"; import { getUint64LittleEndian } from "@yume-chan/no-data-view"; import type { AbortSignal, MaybeConsumable, ReadableWritablePair, - WritableStreamDefaultWriter, } from "@yume-chan/stream-extra"; -import { - BufferedReadableStream, - tryCancel, - tryClose, -} from "@yume-chan/stream-extra"; -import { - bipedal, - decodeUtf8, - encodeUtf8, - TextDecoder, -} from "@yume-chan/struct"; +import { AbortController } from "@yume-chan/stream-extra"; import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js"; import { AdbBanner } from "../banner.js"; import type { DeviceObserver as DeviceObserverBase } from "../device-observer.js"; import type { AdbFeature } from "../features.js"; -import { hexToNumber, sequenceEqual, write4HexDigits } from "../utils/index.js"; +import { hexToNumber, sequenceEqual } from "../utils/index.js"; +import { AdbServerDeviceObserverOwner } from "./observer.js"; +import { AdbServerStream, FAIL } from "./stream.js"; import { AdbServerTransport } from "./transport.js"; -const OKAY = encodeUtf8("OKAY"); -const FAIL = encodeUtf8("FAIL"); - -class AdbServerStream { - #connection: AdbServerClient.ServerConnection; - #buffered: BufferedReadableStream; - #writer: WritableStreamDefaultWriter; - - constructor(connection: AdbServerClient.ServerConnection) { - this.#connection = connection; - this.#buffered = new BufferedReadableStream(connection.readable); - this.#writer = connection.writable.getWriter(); - } - - readExactly(length: number): MaybePromiseLike { - return this.#buffered.readExactly(length); - } - - readString = bipedal(function* (this: AdbServerStream, then) { - const data = yield* then(this.readExactly(4)); - const length = hexToNumber(data); - if (length === 0) { - return ""; - } else { - const decoder = new TextDecoder(); - let result = ""; - const iterator = this.#buffered.iterateExactly(length); - while (true) { - const { done, value } = iterator.next(); - if (done) { - break; - } - result += decoder.decode(yield* then(value), { stream: true }); - } - result += decoder.decode(); - return result; - } - }); - - async writeString(value: string): Promise { - // TODO: investigate using `encodeUtf8("0000" + value)` then modifying the length - // That way allocates a new string (hopefully only a rope) instead of a new buffer - const encoded = encodeUtf8(value); - const buffer = new Uint8Array(4 + encoded.length); - write4HexDigits(buffer, 0, encoded.length); - buffer.set(encoded, 4); - await this.#writer.write(buffer); - } - - async readOkay(): Promise { - const response = await this.readExactly(4); - if (sequenceEqual(response, OKAY)) { - // `OKAY` is followed by data length and data - // But different services want to parse the data differently - // So don't read the data here - return; - } - - if (sequenceEqual(response, FAIL)) { - const reason = await this.readString(); - throw new Error(reason); - } - - throw new Error(`Unexpected response: ${decodeUtf8(response)}`); - } - - release() { - this.#writer.releaseLock(); - return { - readable: this.#buffered.release(), - writable: this.#connection.writable, - closed: this.#connection.closed, - close: () => this.#connection.close(), - }; - } - - async dispose() { - void tryCancel(this.#buffered); - void tryClose(this.#writer); - await this.#connection.close(); - } -} - /** * Client for the ADB Server. */ export class AdbServerClient { - static readonly VERSION = 41; - static parseDeviceList(value: string): AdbServerClient.Device[] { const devices: AdbServerClient.Device[] = []; for (const line of value.split("\n")) { @@ -196,6 +101,7 @@ export class AdbServerClient { readonly wireless = new AdbServerClient.WirelessCommands(this); readonly mDns = new AdbServerClient.MDnsCommands(this); + #observerOwner = new AdbServerDeviceObserverOwner(this); constructor(connector: AdbServerClient.ServerConnector) { this.connector = connector; @@ -240,11 +146,11 @@ export class AdbServerClient { } } - async validateVersion() { + async validateVersion(minimalVersion: number) { const version = await this.getVersion(); - if (version !== AdbServerClient.VERSION) { + if (version < minimalVersion) { throw new Error( - `adb server version (${version}) doesn't match this client (${AdbServerClient.VERSION})`, + `adb server version (${version}) doesn't match this client (${minimalVersion})`, ); } } @@ -288,61 +194,10 @@ export class AdbServerClient { /** * Monitors device list changes. */ - async trackDevices(): Promise { - const connection = await this.createConnection("host:track-devices-l"); - - let current: AdbServerClient.Device[] = []; - const onError = new EventEmitter(); - const onDeviceAdd = new EventEmitter(); - const onDeviceRemove = new EventEmitter(); - const onListChange = new EventEmitter(); - - void (async () => { - try { - while (true) { - const response = await connection.readString(); - const next = AdbServerClient.parseDeviceList(response); - - const added: AdbServerClient.Device[] = []; - for (const nextDevice of next) { - const index = current.findIndex( - (device) => - device.transportId === nextDevice.transportId, - ); - if (index === -1) { - added.push(nextDevice); - continue; - } - - current[index] = current[current.length - 1]!; - current.length -= 1; - } - - if (added.length) { - onDeviceAdd.fire(added); - } - if (current.length) { - onDeviceRemove.fire(current); - } - - current = next; - onListChange.fire(current); - } - } catch (e) { - onError.fire(e as Error); - } - })(); - - return { - onError: onError.event, - onDeviceAdd: onDeviceAdd.event, - onDeviceRemove: onDeviceRemove.event, - onListChange: onListChange.event, - get current() { - return current; - }, - stop: () => connection.dispose(), - }; + async trackDevices( + options?: AdbServerClient.ServerConnectionOptions, + ): Promise { + return this.#observerOwner.createObserver(options); } /** @@ -412,20 +267,22 @@ export class AdbServerClient { device: AdbServerClient.DeviceSelector, service: string, ): Promise { - await this.validateVersion(); - let switchService: string; let transportId: bigint | undefined; if (!device) { + await this.validateVersion(41); switchService = `host:tport:any`; } else if ("transportId" in device) { switchService = `host:transport-id:${device.transportId}`; transportId = device.transportId; } else if ("serial" in device) { + await this.validateVersion(41); switchService = `host:tport:serial:${device.serial}`; } else if ("usb" in device) { + await this.validateVersion(41); switchService = `host:tport:usb`; } else if ("tcp" in device) { + await this.validateVersion(41); switchService = `host:tport:local`; } else { throw new TypeError("Invalid device selector"); @@ -467,18 +324,7 @@ export class AdbServerClient { throw e; } } - - /** - * Wait for a device to be connected or disconnected. - * - * `adb wait-for-` - * - * @param device The device selector - * @param state The state to wait for - * @param options The options - * @returns A promise that resolves when the condition is met. - */ - async waitFor( + async #waitForUnchecked( device: AdbServerClient.DeviceSelector, state: "device" | "disconnect", options?: AdbServerClient.ServerConnectionOptions, @@ -513,6 +359,60 @@ export class AdbServerClient { } } + /** + * Wait for a device to be connected or disconnected. + * + * `adb wait-for-` + * + * @param device The device selector + * @param state The state to wait for + * @param options The options + * @returns A promise that resolves when the condition is met. + */ + async waitFor( + device: AdbServerClient.DeviceSelector, + state: "device" | "disconnect", + options?: AdbServerClient.ServerConnectionOptions, + ): Promise { + if (state === "disconnect") { + await this.validateVersion(41); + } + + return this.#waitForUnchecked(device, state, options); + } + + async waitForDisconnect( + transportId: bigint, + options?: AdbServerClient.ServerConnectionOptions, + ): Promise { + const serverVersion = await this.getVersion(); + if (serverVersion >= 41) { + return this.#waitForUnchecked( + { transportId }, + "disconnect", + options, + ); + } else { + const observer = await this.trackDevices(options); + return new Promise((resolve, reject) => { + observer.onDeviceRemove((devices) => { + if ( + devices.some( + (device) => device.transportId === transportId, + ) + ) { + observer.stop(); + resolve(); + } + }); + observer.onError((e) => { + observer.stop(); + reject(e); + }); + }); + } + } + /** * Creates an ADB Transport for the specified device. */ @@ -533,12 +433,26 @@ export class AdbServerClient { features, ); - return new AdbServerTransport( + const waitAbortController = new AbortController(); + const disconnected = this.waitForDisconnect(transportId, { + unref: true, + signal: waitAbortController.signal, + }); + + const transport = new AdbServerTransport( this, info?.serial ?? "", banner, transportId, + disconnected, ); + + transport.disconnected.then( + () => waitAbortController.abort(), + () => waitAbortController.abort(), + ); + + return transport; } } diff --git a/libraries/adb/src/server/index.ts b/libraries/adb/src/server/index.ts index b6e31c575..92c194f0b 100644 --- a/libraries/adb/src/server/index.ts +++ b/libraries/adb/src/server/index.ts @@ -1,2 +1,4 @@ export * from "./client.js"; +export * from "./observer.js"; +export * from "./stream.js"; export * from "./transport.js"; diff --git a/libraries/adb/src/server/observer.ts b/libraries/adb/src/server/observer.ts new file mode 100644 index 000000000..85dbc33b8 --- /dev/null +++ b/libraries/adb/src/server/observer.ts @@ -0,0 +1,144 @@ +import { EventEmitter } from "@yume-chan/event"; + +import { Ref } from "../utils/index.js"; + +import { AdbServerClient } from "./client.js"; +import type { AdbServerStream } from "./stream.js"; + +function unorderedRemove(array: T[], index: number) { + array[index] = array[array.length - 1]!; + array.length -= 1; +} + +export class AdbServerDeviceObserverOwner { + current: AdbServerClient.Device[] = []; + + #client: AdbServerClient; + #stream: Promise | undefined; + #observers: { + onDeviceAdd: EventEmitter; + onDeviceRemove: EventEmitter; + onListChange: EventEmitter; + onError: EventEmitter; + }[] = []; + + constructor(client: AdbServerClient) { + this.#client = client; + } + + async #receive(stream: AdbServerStream) { + try { + while (true) { + const response = await stream.readString(); + const next = AdbServerClient.parseDeviceList(response); + + const added: AdbServerClient.Device[] = []; + for (const nextDevice of next) { + const index = this.current.findIndex( + (device) => + device.transportId === nextDevice.transportId, + ); + if (index === -1) { + added.push(nextDevice); + continue; + } + + unorderedRemove(this.current, index); + } + + if (added.length) { + for (const observer of this.#observers) { + observer.onDeviceAdd.fire(added); + } + } + if (this.current.length) { + for (const observer of this.#observers) { + observer.onDeviceRemove.fire(this.current); + } + } + + this.current = next; + for (const observer of this.#observers) { + observer.onListChange.fire(this.current); + } + } + } catch (e) { + for (const observer of this.#observers) { + observer.onError.fire(e as Error); + } + } + } + + async #connect() { + const stream = await this.#client.createConnection( + "host:track-devices-l", + // Each individual observer will ref depending on their options + { unref: true }, + ); + + void this.#receive(stream); + + return stream; + } + + async #handleObserverStop(stream: AdbServerStream) { + if (this.#observers.length === 0) { + this.#stream = undefined; + await stream.dispose(); + } + } + + async createObserver( + options?: AdbServerClient.ServerConnectionOptions, + ): Promise { + if (options?.signal?.aborted) { + throw options.signal.reason; + } + + this.#stream ??= this.#connect(); + const stream = await this.#stream; + + if (options?.signal?.aborted) { + await this.#handleObserverStop(stream); + throw options.signal.reason; + } + + const onDeviceAdd = new EventEmitter(); + const onDeviceRemove = new EventEmitter(); + const onListChange = new EventEmitter(); + const onError = new EventEmitter(); + + const observer = { onDeviceAdd, onDeviceRemove, onListChange, onError }; + this.#observers.push(observer); + + const ref = new Ref(options); + + const stop = async () => { + const index = self.#observers.indexOf(observer); + if (index === -1) { + return; + } + + unorderedRemove(this.#observers, index); + + await this.#handleObserverStop(stream); + + ref.unref(); + }; + + options?.signal?.addEventListener("abort", () => void stop()); + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + return { + onDeviceAdd: onDeviceAdd.event, + onDeviceRemove: onDeviceRemove.event, + onListChange: onListChange.event, + onError: onError.event, + get current() { + return self.current; + }, + stop, + }; + } +} diff --git a/libraries/adb/src/server/stream.ts b/libraries/adb/src/server/stream.ts new file mode 100644 index 000000000..dd7877c22 --- /dev/null +++ b/libraries/adb/src/server/stream.ts @@ -0,0 +1,100 @@ +import type { MaybePromiseLike } from "@yume-chan/async"; +import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra"; +import { + BufferedReadableStream, + tryCancel, + tryClose, +} from "@yume-chan/stream-extra"; +import { + bipedal, + decodeUtf8, + encodeUtf8, + TextDecoder, +} from "@yume-chan/struct"; + +import { hexToNumber, sequenceEqual, write4HexDigits } from "../utils/index.js"; + +import type { AdbServerClient } from "./client.js"; + +const OKAY = encodeUtf8("OKAY"); +export const FAIL = encodeUtf8("FAIL"); + +export class AdbServerStream { + #connection: AdbServerClient.ServerConnection; + #buffered: BufferedReadableStream; + #writer: WritableStreamDefaultWriter; + + constructor(connection: AdbServerClient.ServerConnection) { + this.#connection = connection; + this.#buffered = new BufferedReadableStream(connection.readable); + this.#writer = connection.writable.getWriter(); + } + + readExactly(length: number): MaybePromiseLike { + return this.#buffered.readExactly(length); + } + + readString = bipedal(function* (this: AdbServerStream, then) { + const data = yield* then(this.readExactly(4)); + const length = hexToNumber(data); + if (length === 0) { + return ""; + } else { + const decoder = new TextDecoder(); + let result = ""; + const iterator = this.#buffered.iterateExactly(length); + while (true) { + const { done, value } = iterator.next(); + if (done) { + break; + } + result += decoder.decode(yield* then(value), { stream: true }); + } + result += decoder.decode(); + return result; + } + }); + + async readOkay(): Promise { + const response = await this.readExactly(4); + if (sequenceEqual(response, OKAY)) { + // `OKAY` is followed by data length and data + // But different services want to parse the data differently + // So don't read the data here + return; + } + + if (sequenceEqual(response, FAIL)) { + const reason = await this.readString(); + throw new Error(reason); + } + + throw new Error(`Unexpected response: ${decodeUtf8(response)}`); + } + + async writeString(value: string): Promise { + // TODO: investigate using `encodeUtf8("0000" + value)` then modifying the length + // That way allocates a new string (hopefully only a rope) instead of a new buffer + const encoded = encodeUtf8(value); + const buffer = new Uint8Array(4 + encoded.length); + write4HexDigits(buffer, 0, encoded.length); + buffer.set(encoded, 4); + await this.#writer.write(buffer); + } + + release() { + this.#writer.releaseLock(); + return { + readable: this.#buffered.release(), + writable: this.#connection.writable, + closed: this.#connection.closed, + close: () => this.#connection.close(), + }; + } + + async dispose() { + void tryCancel(this.#buffered); + void tryClose(this.#writer); + await this.#connection.close(); + } +} diff --git a/libraries/adb/src/server/transport.ts b/libraries/adb/src/server/transport.ts index c1125f85e..44ce1b511 100644 --- a/libraries/adb/src/server/transport.ts +++ b/libraries/adb/src/server/transport.ts @@ -1,6 +1,4 @@ -import type { MaybePromiseLike } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async"; -import { AbortController } from "@yume-chan/stream-extra"; import type { AdbIncomingSocketHandler, @@ -44,9 +42,13 @@ export class AdbServerTransport implements AdbTransport { readonly banner: AdbBanner; + #sockets: AdbSocket[] = []; + #closed = new PromiseResolver(); - #waitAbortController = new AbortController(); - readonly disconnected: Promise; + #disconnected: Promise; + get disconnected() { + return this.#disconnected; + } get clientFeatures() { // No need to get host features (features supported by ADB server) @@ -54,31 +56,29 @@ export class AdbServerTransport implements AdbTransport { return ADB_SERVER_DEFAULT_FEATURES; } + // eslint-disable-next-line @typescript-eslint/max-params constructor( client: AdbServerClient, serial: string, banner: AdbBanner, transportId: bigint, + disconnected: Promise, ) { this.#client = client; this.serial = serial; this.banner = banner; this.transportId = transportId; - this.disconnected = Promise.race([ - this.#closed.promise, - client.waitFor({ transportId }, "disconnect", { - signal: this.#waitAbortController.signal, - unref: true, - }), - ]); + this.#disconnected = Promise.race([this.#closed.promise, disconnected]); } async connect(service: string): Promise { - return await this.#client.createDeviceConnection( + const socket = await this.#client.createDeviceConnection( { transportId: this.transportId }, service, ); + this.#sockets.push(socket); + return socket; } async addReverseTunnel( @@ -96,8 +96,10 @@ export class AdbServerTransport implements AdbTransport { await this.#client.connector.clearReverseTunnels(); } - close(): MaybePromiseLike { + async close(): Promise { + for (const socket of this.#sockets) { + await socket.close(); + } this.#closed.resolve(); - this.#waitAbortController.abort(); } } diff --git a/libraries/adb/src/utils/index.ts b/libraries/adb/src/utils/index.ts index 45f66e797..ab289812a 100644 --- a/libraries/adb/src/utils/index.ts +++ b/libraries/adb/src/utils/index.ts @@ -3,4 +3,5 @@ export * from "./auto-reset-event.js"; export * from "./base64.js"; export * from "./hex.js"; export * from "./no-op.js"; +export * from "./ref.js"; export * from "./sequence-equal.js"; diff --git a/libraries/adb/src/utils/ref.ts b/libraries/adb/src/utils/ref.ts new file mode 100644 index 000000000..9860094f9 --- /dev/null +++ b/libraries/adb/src/utils/ref.ts @@ -0,0 +1,37 @@ +interface GlobalExtension { + setInterval: (callback: () => void, delay: number) => number; + clearInterval: (id: number) => void; +} + +const { setInterval, clearInterval } = globalThis as unknown as GlobalExtension; + +/** + * An object to keep current Node.js process alive even when no code is running. + * + * Does nothing in Web environments. + * + * Note that it does't have reference counting. Calling `unref` will + * remove the ref no matter how many times `ref` has been previously called, and vice versa. + * This is the same as how Node.js works. + */ +export class Ref { + #intervalId: number | undefined; + + constructor(options?: { unref?: boolean | undefined }) { + if (!options?.unref) { + this.ref(); + } + } + + ref() { + // `setInterval` can keep current Node.js alive, the delay value doesn't matter + this.#intervalId = setInterval(() => {}, 60 * 1000); + } + + unref() { + if (this.#intervalId) { + clearInterval(this.#intervalId); + this.#intervalId = undefined; + } + } +}