diff --git a/src/constants.ts b/src/constants.ts index b4ec85e..22feb26 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -13,4 +13,5 @@ export const LUA = 'lua' export const MAX_INT_ID = 2 ** 30 -export const __MIGRATIONS = '__migrations' \ No newline at end of file +export const __MIGRATIONS = '__migrations' +export const _LOCK = '_lock' \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 23cdf1b..93a7b34 100644 --- a/src/index.ts +++ b/src/index.ts @@ -94,30 +94,7 @@ export class WSDiscovery { } async connect() { - type ScriptData = { - keys: number - readOnly: boolean - } - const scripts: Record = { - [CustomScripts.CHANNEL_ADD]: { - keys: 1, - readOnly: false, - }, - [CustomScripts.CHANNEL_REMOVE]: { - keys: 1, - readOnly: false, - } - } - - for (const [scriptName, scriptData] of Object.entries(scripts)) { - this.redis.defineCommand(scriptName, { - lua: await readFile(resolve(__dirname, '..', LUA, `${scriptName}.${LUA}`), 'utf8'), - numberOfKeys: scriptData.keys, - readOnly: scriptData.readOnly, - }) - } - - await this.redis.ping() + await this.defineCommands() await this.migrate() } @@ -273,15 +250,23 @@ export class WSDiscovery { return this.prefixClient + clientId } - protected async lock(key: string, token: string) { + protected getMigrationsKey() { + return this.prefix + '__migrations' + } - for (let i = 0; i < 1000; ++i) { - const result = await this.redis.set(key, token, 'EX', 30, 'NX') + protected getMigrationsLockKey() { + return this.getMigrationsKey() + '_lock' + } + + protected async lock(key: string, token: string, ttl: number) { + + for (let i = 0; i < 60; ++i) { + const result = await this.redis.set(key, token, 'EX', ttl, 'NX') if (result) { return } - await sleep(500) + await sleep(1000) } throw new Error(`can not take redis lock on key=${key}`) } @@ -296,17 +281,42 @@ export class WSDiscovery { `, 1, key, token) } + protected async defineCommands() { + type ScriptData = { + keys: number + readOnly: boolean + } + const scripts: Record = { + [CustomScripts.CHANNEL_ADD]: { + keys: 1, + readOnly: false, + }, + [CustomScripts.CHANNEL_REMOVE]: { + keys: 1, + readOnly: false, + } + } + + for (const [scriptName, scriptData] of Object.entries(scripts)) { + this.redis.defineCommand(scriptName, { + lua: await readFile(resolve(__dirname, '..', LUA, `${scriptName}.${LUA}`), 'utf8'), + numberOfKeys: scriptData.keys, + readOnly: scriptData.readOnly, + }) + } + } + protected async migrate() { const migrations: Array<[string, string[]]> = [ ['FT.CREATE', `${this.indexClntChnl} PREFIX 1 ${this.prefixClient} SCHEMA ${CHNL} TAG`.split(' ')], ] const token = `${Date.now()}_${Math.floor(Math.random() * 99999999)}` - const migrationsKey = this.prefix + __MIGRATIONS - const lockKey = migrationsKey + '_lock' - - await this.lock(lockKey, token) - + const lockKey = this.getMigrationsLockKey() + + await this.lock(lockKey, token, 30) + + const migrationsKey = this.getMigrationsKey() for (let i = 0; i < migrations.length; ++i) { const migrationId = 'm' + i const applied = await this.redis.sismember(migrationsKey, migrationId) diff --git a/test/connect.test.ts b/test/connect.test.ts new file mode 100644 index 0000000..78ec595 --- /dev/null +++ b/test/connect.test.ts @@ -0,0 +1,25 @@ +import { equal, rejects } from 'assert/strict' +import { describe, it, before, after } from 'node:test' + + +import { clearRedis, createRedis, sleep, WSDiscoveryForTests } from './utils' +import { __MIGRATIONS } from '../src/constants' + +describe('connect', () => { + const redis = createRedis() + const wsd = new WSDiscoveryForTests({ + redis, + }) + + + after(async () => { + await clearRedis(redis, wsd.prefix) + await redis.quit() + }) + + it('lock is already taken', async () => { + await wsd.lockForTests(1) + await wsd.connect() + }) + +}) diff --git a/test/utils.ts b/test/utils.ts index a3f363d..3dc1133 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -1,7 +1,7 @@ import { Redis } from "ioredis" import { WSDiscovery } from "../src" -import { ID } from "../src/constants" +import { __MIGRATIONS, ID } from "../src/constants" export const createRedis = () => { return new Redis({ @@ -20,8 +20,9 @@ export const clearRedis = async (redis: Redis, prefix: string) => { do { const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', prefix + '*') - if (keys.length) { - await redis.del(...(keys.filter((k) => !k.startsWith(prefix + '__')))) + const keysForDelete = keys.filter((k) => !k.startsWith(prefix + '__')) + if (keysForDelete.length) { + await redis.del(...keysForDelete) } cursor = nextCursor } while (cursor !== '0') @@ -43,4 +44,8 @@ export class WSDiscoveryForTests extends WSDiscovery { getClientTTL(id: number) { return this.redis.ttl(this.getClientKey(id)) } + + lockForTests(ttl: number) { + return this.lock(this.getMigrationsLockKey(), 'test', ttl) + } } \ No newline at end of file