Skip to content

Commit

Permalink
feat: add index for channels (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
arusakov authored Oct 13, 2024
1 parent 61f24ad commit cdd33f5
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 57 deletions.
3 changes: 1 addition & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

services:
redis:
# image: redis/redis-stack:7.2.0-v12
image: redis/redis-stack:7.2.0-v12
image: redis/redis-stack:7.2.0-v13
ports:
- "127.0.0.1:6379:6379"
environment:
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
},
"scripts": {
"build": "tsc -p .",
"compile": "yarn build --noEmit",
"compile": "tsc --noEmit -p ./test",
"test:all": "yarn test ./test/*.test.ts",
"test:coverage": "c8 --reporter=lcovonly --reporter=text yarn test:all",
"test": "node --test --test-concurrency=1 --require=ts-node/register"
Expand Down
4 changes: 3 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ export const SID = 'sid'
export const ID = 'id'
export const CHNL = 'chnl'

export const MAX_INT_ID = 2 ** 30
export const MAX_INT_ID = 2 ** 30

export const __MIGRATIONS = '__migrations'
167 changes: 150 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Redis } from 'ioredis'
import { CHNL, CLNT, ID, IP, MAX_INT_ID, SID, SRVR, TTL_DEFAULT } from './constants'
import type { Callback, Redis, Result } from 'ioredis'
import { __MIGRATIONS, CHNL, CLNT, ID, IP, MAX_INT_ID, SID, SRVR, TTL_DEFAULT } from './constants'
import { sleep } from './utils'

export type WSDiscoveryOptions = {
redis: Redis
Expand All @@ -11,6 +12,16 @@ export type WSDiscoveryOptions = {
}


export type ClientWithServer = {
[CLNT]: number
[SRVR]: number
}

export type ClientFields =
| typeof SID
| typeof CHNL
| typeof SRVR


const assertTTL = (ttl?: number): void | never => {
if (ttl != null && ttl <= 0) {
Expand All @@ -24,37 +35,51 @@ const assertChannel = (channel: string): void | never => {
}
}

export class WSDiscovery {
// Add declarations
declare module "ioredis" {
interface RedisCommander<Context> {
myecho(
key: string,
argv: string,
callback?: Callback<string>
): Result<string, Context>;
}
}

export class WSDiscovery {
readonly prefix: string
readonly prefixServer: string
readonly prefixClient: string
readonly ttlServer: number
readonly ttlClient: number

readonly indexClntChnl: string

protected readonly redis: Redis

protected readonly ttlServer: number
protected readonly ttlClient: number

protected readonly prefixServer: string
protected readonly prefixClient: string

constructor({
redis,
ttl = TTL_DEFAULT,
prefix = 'wsd',
}: WSDiscoveryOptions) {
this.redis = redis

assertTTL(ttl.server)
this.ttlServer = ttl.server || TTL_DEFAULT.server
assertTTL(ttl.client)
this.ttlClient = ttl.client || TTL_DEFAULT.client

this.prefixServer = `${prefix}:${SRVR}:`
this.prefixClient = `${prefix}:${CLNT}:`
this.prefix = `${prefix}:`
this.prefixServer = `${this.prefix}${SRVR}:`
this.prefixClient = `${this.prefix}${CLNT}:`

this.indexClntChnl = `${this.prefix}__idx_${CLNT}_${CHNL}`
}

async connect() {
await this.redis.ping()

const list = await this.listIndexes()
// TODO load lua to redis
await this.migrate()
}

async registerServer(serverIp: string, ttl?: number) {
Expand Down Expand Up @@ -112,6 +137,13 @@ export class WSDiscovery {
return clientId
}

async getClient(clientId: number, fields: ClientFields[] = [CHNL, SID, SRVR]) {
return this.redis.hmget(
this.getClientKey(clientId),
...fields,
)
}

/**
*
* @returns
Expand Down Expand Up @@ -174,7 +206,7 @@ export class WSDiscovery {
script,
1,
[this.getClientKey(clientId), CHNL, channel],
)
) as Promise<string[]>
}

async removeChannel(clientId: number, channel: string) {
Expand Down Expand Up @@ -207,12 +239,113 @@ export class WSDiscovery {
)
}

async getClientsByChannel(channel: string, batch = 100): Promise<ClientWithServer[]> {
type KeyAndKey = ['__key', string]
type AggregateAndCursorResponse = [[1, ...KeyAndKey[]], number]

let [aggregateResult, cursor] = await this.redis.call(
'FT.AGGREGATE',
this.indexClntChnl,
`@${CHNL}:{${channel}}`,
'LOAD', 1, '@__key',
'WITHCURSOR',
'COUNT', batch,
) as AggregateAndCursorResponse

const keys: string[] = []

while (true) {
keys.push(...((aggregateResult.slice(1) as KeyAndKey[]).map(([_key, key]) => key)))
if (!cursor) {
break
}

[aggregateResult, cursor] = await this.redis.call(
'FT.CURSOR', 'READ', this.indexClntChnl, cursor
) as AggregateAndCursorResponse
}

const hgetResults = await this.redis.pipeline(
keys.map((k) => ['hget', k, SRVR]),
).exec()

if (!hgetResults) {
throw new Error('multiple hget error')
}

const results: ClientWithServer[] = []

for (const [index, key] of keys.entries()) {
const [err, serverId] = hgetResults[index]

if (err) {
continue
}

results.push({
[CLNT]: Number(key.substring(this.prefixClient.length)),
[SRVR]: Number(serverId),
})
}

return results
}

protected getClientKey(clientId: number) {
return this.prefixClient + clientId
}

protected listIndexes() {
return this.redis.call('FT._LIST')
protected async lock(key: string, token: string) {

for (let i = 0; i < 1000; ++i) {
const result = await this.redis.set(key, token, 'EX', 30, 'NX')
if (result) {
return
}

await sleep(500)
}
throw new Error(`can not take redis lock on key=${key}`)
}

protected async unlock(key: string, token: string) {
await this.redis.eval(`
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
`, 1, key, token)
}

protected async migrate() {
const migrations: Array<[string, string[]]> = [
['FT.CREATE', `${this.indexClntChnl} PREFIX 1 ${this.prefixClient} SCHEMA ${CHNL} TAG`.split(' ')],
]

const token = `${Date.now()}_${Math.floor(Math.random() * 99999999)}`
const migrationsKey = this.prefix + __MIGRATIONS
const lockKey = migrationsKey + '_lock'

await this.lock(lockKey, token)

const applyedMigrations = new Set(await this.redis.smembers(migrationsKey))

for (let i = 0; i < migrations.length; ++i) {
const migrationId = 'm' + i
if (applyedMigrations.has(migrationId)) {
continue
}

const migration = migrations[i]
// TODO there can be logical errors inside transaction!!
await this.redis
.multi()
.call(migration[0], migration[1])
.sadd(migrationsKey, migrationId)
.exec()
}
await this.unlock(lockKey, token)
}

}
3 changes: 3 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const sleep = (ms: number) => new Promise((resolve) => {
setTimeout(resolve, ms)
})
Loading

0 comments on commit cdd33f5

Please sign in to comment.