Skip to content

Commit

Permalink
next
Browse files Browse the repository at this point in the history
  • Loading branch information
arusakov committed Oct 9, 2024
1 parent 4d4d646 commit d7eb956
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ export type WSDiscoveryOptions = {
}


export type ClientWithServer = {
[SID]: number
[SRVR]: number
}


const assertTTL = (ttl?: number): void | never => {
if (ttl != null && ttl <= 0) {
Expand Down Expand Up @@ -64,7 +69,7 @@ export class WSDiscovery {
this.prefixServer = `${this.prefix}${SRVR}:`
this.prefixClient = `${this.prefix}${CLNT}:`

this.indexClntChnl = `${this.prefix}__index_${CLNT}_${CHNL}`
this.indexClntChnl = `${this.prefix}__idx_${CLNT}_${CHNL}`
}

async connect() {
Expand Down Expand Up @@ -222,24 +227,42 @@ export class WSDiscovery {
)
}

async getClientsByChannel(channel: string, size = 100) {
const clients: string[] = []
async getClientsByChannel(channel: string, size = 10): Promise<ClientWithServer[]> {
type KeyAndKey = ['__key', string]
type AggregateAndCursorResponse = [[1, ...KeyAndKey[]], number]

const result = await this.redis.call(
let [result, cursor] = await this.redis.call(
'FT.AGGREGATE',
this.indexClntChnl,
`@${CHNL}:{${channel}}`,
'LOAD', 1, '@__key',
'WITHCURSOR',
'COUNT', size,
) as [[number, string[]], number]
) as AggregateAndCursorResponse

clients.push(...result[0][1])
const keys: string[] = []

while (true) {
keys.push(...((result.slice(1) as KeyAndKey[]).map(([_key, key]) => key)))
if (!cursor) {
break
}

const res2 = await this.redis.call(
'FT.'
)
[result, cursor] = await this.redis.call(
'FT.CURSOR', 'READ', this.indexClntChnl, cursor
) as AggregateAndCursorResponse
}

const clients = await this.redis.pipeline(
keys.map((k) => ['hget', k, SRVR]),
).exec()

return keys.map((k, index) => ({
sid: clients[index][0].
}))


return clients
return res2
}

protected getClientKey(clientId: number) {
Expand Down

0 comments on commit d7eb956

Please sign in to comment.