Skip to content

Commit

Permalink
next
Browse files Browse the repository at this point in the history
  • Loading branch information
arusakov committed Oct 13, 2024
1 parent d7eb956 commit f1df349
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 48 deletions.
59 changes: 44 additions & 15 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ export type WSDiscoveryOptions = {


export type ClientWithServer = {
[SID]: number
[CLNT]: number
[SRVR]: number
}

export type ClientFields =
| typeof SID
| typeof CHNL
| typeof SRVR


const assertTTL = (ttl?: number): void | never => {
if (ttl != null && ttl <= 0) {
Expand Down Expand Up @@ -132,6 +137,13 @@ export class WSDiscovery {
return clientId
}

async getClient(clientId: number, fields: ClientFields[] = [CHNL, SID, SRVR]) {
return this.redis.hmget(
this.getClientKey(clientId),
...fields,
)
}

/**
*
* @returns
Expand Down Expand Up @@ -227,42 +239,56 @@ export class WSDiscovery {
)
}

async getClientsByChannel(channel: string, size = 10): Promise<ClientWithServer[]> {
async getClientsByChannel(channel: string, batch = 100): Promise<ClientWithServer[]> {
type KeyAndKey = ['__key', string]
type AggregateAndCursorResponse = [[1, ...KeyAndKey[]], number]

let [result, cursor] = await this.redis.call(
let [aggregateResult, cursor] = await this.redis.call(
'FT.AGGREGATE',
this.indexClntChnl,
`@${CHNL}:{${channel}}`,
'LOAD', 1, '@__key',
'WITHCURSOR',
'COUNT', size,
'COUNT', batch,
) as AggregateAndCursorResponse

const keys: string[] = []

while (true) {
keys.push(...((result.slice(1) as KeyAndKey[]).map(([_key, key]) => key)))
keys.push(...((aggregateResult.slice(1) as KeyAndKey[]).map(([_key, key]) => key)))
if (!cursor) {
break
}

[result, cursor] = await this.redis.call(
[aggregateResult, cursor] = await this.redis.call(
'FT.CURSOR', 'READ', this.indexClntChnl, cursor
) as AggregateAndCursorResponse
}

const clients = await this.redis.pipeline(
const hgetResults = await this.redis.pipeline(
keys.map((k) => ['hget', k, SRVR]),
).exec()

return keys.map((k, index) => ({
sid: clients[index][0].
}))

if (!hgetResults) {
throw new Error('multiple hget error')
}

const results: ClientWithServer[] = []

for (const [index, key] of keys.entries()) {
const [err, serverId] = hgetResults[index]

if (err) {
continue
}

results.push({
[CLNT]: Number(key.substring(this.prefixClient.length)),
[SRVR]: Number(serverId),
})
}

return res2
return results
}

protected getClientKey(clientId: number) {
Expand All @@ -277,7 +303,6 @@ export class WSDiscovery {
return
}

console.log(result)
await sleep(500)
}
throw new Error(`can not take redis lock on key=${key}`)
Expand Down Expand Up @@ -313,8 +338,12 @@ export class WSDiscovery {
}

const migration = migrations[i]
await this.redis.call(migration[0], migration[1])
await this.redis.sadd(migrationsKey, migrationId)
// TODO there can be logical errors inside transaction!!
await this.redis
.multi()
.call(migration[0], migration[1])
.sadd(migrationsKey, migrationId)
.exec()
}
await this.unlock(lockKey, token)
}
Expand Down
95 changes: 89 additions & 6 deletions test/channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { deepEqual, equal, rejects } from 'assert/strict'
import { describe, it, before, after } from 'node:test'
import { deepEqual, rejects } from 'assert/strict'
import { describe, it, before, after, beforeEach, afterEach } from 'node:test'

import { CLNT, SRVR } from '../src/constants'

import { clearRedis, createRedis, WSDiscoveryForTests } from './utils'

Expand All @@ -21,14 +23,22 @@ describe('Channels', () => {
before(async () => {
await wsd.connect()

serverId1 = await wsd.registerServer(ip1)
serverId2 = await wsd.registerServer(ip2)
serverId1 = await wsd.registerServer(ip1, 300)
serverId2 = await wsd.registerServer(ip2, 300)
})

beforeEach(async () => {
clientId1 = await wsd.registerClient(serverId1, 1)
clientId2 = await wsd.registerClient(serverId1, 2)
clientId3 = await wsd.registerClient(serverId2, 1)
})

afterEach(async () => {
await wsd.deleteClient(clientId1)
await wsd.deleteClient(clientId2)
await wsd.deleteClient(clientId3)
})

after(async () => {
await clearRedis(redis, wsd.prefix)
await redis.quit()
Expand Down Expand Up @@ -87,14 +97,87 @@ describe('Channels', () => {
)
})

it('getClientsByChannel() return one', async () => {
await wsd.addChannel(clientId1, 'abc')

deepEqual(
await wsd.getClientsByChannel('xyz'),
[],
)
})

it('getClientsByChannel() one', async () => {
await wsd.addChannel(clientId1, 'abc')
await wsd.addChannel(clientId2, 'xyz')

deepEqual(
await wsd.getClientsByChannel('xyz'),
[{
[CLNT]: clientId2,
[SRVR]: serverId1,
}],
)
})

it('getClientsByChannel() return two', async () => {
await wsd.addChannel(clientId1, 'xyz')
await wsd.addChannel(clientId2, 'abc')
await wsd.addChannel(clientId3, 'xyz')

deepEqual(
await wsd.getClientsByChannel('xyz'),
[],
[
{
[CLNT]: clientId1,
[SRVR]: serverId1,
},
{
[CLNT]: clientId3,
[SRVR]: serverId2,
},
],
)
})

it('getClientsByChannel() with batch=1', async () => {
await wsd.addChannel(clientId1, 'xyz')
await wsd.addChannel(clientId3, 'xyz')

deepEqual(
await wsd.getClientsByChannel('xyz'),
[
{
[CLNT]: clientId1,
[SRVR]: serverId1,
},
{
[CLNT]: clientId3,
[SRVR]: serverId2,
},
],
)
})


it('getClientsByChannel() multiple channels', async () => {
await wsd.addChannel(clientId1, 'xyz')
await wsd.addChannel(clientId1, 'abc')
await wsd.addChannel(clientId1, '123')

await wsd.addChannel(clientId3, 'qwerty')
await wsd.addChannel(clientId3, 'xyz')

deepEqual(
await wsd.getClientsByChannel('xyz'),
[
{
[CLNT]: clientId1,
[SRVR]: serverId1,
},
{
[CLNT]: clientId3,
[SRVR]: serverId2,
},
],
)
})
})
31 changes: 16 additions & 15 deletions test/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { strictEqual, rejects } from 'assert'
import { equal, rejects } from 'assert/strict'
import { describe, it, before, after } from 'node:test'

import { clearRedis, createRedis, sleep, WSDiscoveryForTests } from './utils'
import { MAX_INT_ID } from '../src/constants'

import { clearRedis, createRedis, sleep, WSDiscoveryForTests } from './utils'

describe('Client', () => {
const redis = createRedis()
const wsd = new WSDiscoveryForTests({
Expand All @@ -30,17 +31,17 @@ describe('Client', () => {

it('registerClient() OK', async () => {
const cid = await wsd.registerClient(serverId1, 1)
strictEqual(typeof cid, 'number')
equal(typeof cid, 'number')

const serverId = await wsd.getServerIdByClientId(cid)
strictEqual(serverId, serverId1)
equal(serverId, serverId1)
})

it('registerClient() twice', async () => {
const cid1 = await wsd.registerClient(serverId1, 1)
const cid2 = await wsd.registerClient(serverId2, 2)

strictEqual(cid1 + 1, cid2)
equal(cid1 + 1, cid2)
})


Expand All @@ -55,27 +56,27 @@ describe('Client', () => {
await wsd.registerClient(serverId1, 1)

const newId = await wsd.registerClient(serverId2, 2)
strictEqual(newId, 1)
equal(newId, 1)
})

it('updateClientTTL()', async () => {
const cid = await wsd.registerClient(serverId1, 11)

const result = await wsd.updateClientTTL(cid, 1000)
strictEqual(result, true)
equal(result, true)

const ttl = await wsd.getClientTTL(cid)

strictEqual(ttl > 1000 * 0.99, true)
strictEqual(ttl <= 1000, true)
equal(ttl > 1000 * 0.99, true)
equal(ttl <= 1000, true)
})

it('updateClientTTL() expired', async () => {
const cid = 999

const result = await wsd.updateClientTTL(cid, 10)

strictEqual(result, false)
equal(result, false)
})

it('updateClientTTL() bad ttl', async () => {
Expand All @@ -90,17 +91,17 @@ describe('Client', () => {

it('client ttl expires', async () => {
const cid = await wsd.registerClient(serverId1, 1, 1)
strictEqual(await wsd.getServerIdByClientId(cid), serverId1)
equal(await wsd.getServerIdByClientId(cid), serverId1)

await sleep(1000)
strictEqual(await wsd.getServerIdByClientId(cid), 0)
equal(await wsd.getServerIdByClientId(cid), 0)
})

it('delete client', async () => {
const cid = await wsd.registerClient(serverId2, 2, 2)

strictEqual(await wsd.getServerIdByClientId(cid), serverId2)
strictEqual(await wsd.deleteClient(cid), true)
strictEqual(await wsd.getServerIdByClientId(cid), 0)
equal(await wsd.getServerIdByClientId(cid), serverId2)
equal(await wsd.deleteClient(cid), true)
equal(await wsd.getServerIdByClientId(cid), 0)
})
})
Loading

0 comments on commit f1df349

Please sign in to comment.