diff --git a/src/index.ts b/src/index.ts index a727ec6..3eb80b9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,11 @@ export type WSDiscoveryOptions = { } +export type ClientWithServer = { + [SID]: number + [SRVR]: number +} + const assertTTL = (ttl?: number): void | never => { if (ttl != null && ttl <= 0) { @@ -64,7 +69,7 @@ export class WSDiscovery { this.prefixServer = `${this.prefix}${SRVR}:` this.prefixClient = `${this.prefix}${CLNT}:` - this.indexClntChnl = `${this.prefix}__index_${CLNT}_${CHNL}` + this.indexClntChnl = `${this.prefix}__idx_${CLNT}_${CHNL}` } async connect() { @@ -222,24 +227,42 @@ export class WSDiscovery { ) } - async getClientsByChannel(channel: string, size = 100) { - const clients: string[] = [] + async getClientsByChannel(channel: string, size = 10): Promise { + type KeyAndKey = ['__key', string] + type AggregateAndCursorResponse = [[1, ...KeyAndKey[]], number] - const result = await this.redis.call( + let [result, cursor] = await this.redis.call( 'FT.AGGREGATE', this.indexClntChnl, `@${CHNL}:{${channel}}`, + 'LOAD', 1, '@__key', 'WITHCURSOR', 'COUNT', size, - ) as [[number, string[]], number] + ) as AggregateAndCursorResponse - clients.push(...result[0][1]) + const keys: string[] = [] + + while (true) { + keys.push(...((result.slice(1) as KeyAndKey[]).map(([_key, key]) => key))) + if (!cursor) { + break + } - const res2 = await this.redis.call( - 'FT.' - ) + [result, cursor] = await this.redis.call( + 'FT.CURSOR', 'READ', this.indexClntChnl, cursor + ) as AggregateAndCursorResponse + } + + const clients = await this.redis.pipeline( + keys.map((k) => ['hget', k, SRVR]), + ).exec() + + return keys.map((k, index) => ({ + sid: clients[index][0]. + })) + - return clients + return res2 } protected getClientKey(clientId: number) {