Skip to content

Commit

Permalink
feat: update locking code
Browse files Browse the repository at this point in the history
  • Loading branch information
arusakov committed Oct 13, 2024
1 parent 589f251 commit b43093d
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 37 deletions.
3 changes: 2 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ export const LUA = 'lua'

export const MAX_INT_ID = 2 ** 30

export const __MIGRATIONS = '__migrations'
export const __MIGRATIONS = '__migrations'
export const _LOCK = '_lock'
76 changes: 43 additions & 33 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,7 @@ export class WSDiscovery {
}

async connect() {
type ScriptData = {
keys: number
readOnly: boolean
}
const scripts: Record<CustomScripts, ScriptData> = {
[CustomScripts.CHANNEL_ADD]: {
keys: 1,
readOnly: false,
},
[CustomScripts.CHANNEL_REMOVE]: {
keys: 1,
readOnly: false,
}
}

for (const [scriptName, scriptData] of Object.entries(scripts)) {
this.redis.defineCommand(scriptName, {
lua: await readFile(resolve(__dirname, '..', LUA, `${scriptName}.${LUA}`), 'utf8'),
numberOfKeys: scriptData.keys,
readOnly: scriptData.readOnly,
})
}

await this.redis.ping()
await this.defineCommands()
await this.migrate()
}

Expand Down Expand Up @@ -273,15 +250,23 @@ export class WSDiscovery {
return this.prefixClient + clientId
}

protected async lock(key: string, token: string) {
protected getMigrationsKey() {
return this.prefix + '__migrations'
}

for (let i = 0; i < 1000; ++i) {
const result = await this.redis.set(key, token, 'EX', 30, 'NX')
protected getMigrationsLockKey() {
return this.getMigrationsKey() + '_lock'
}

protected async lock(key: string, token: string, ttl: number) {

for (let i = 0; i < 60; ++i) {
const result = await this.redis.set(key, token, 'EX', ttl, 'NX')
if (result) {
return
}

await sleep(500)
await sleep(1000)
}
throw new Error(`can not take redis lock on key=${key}`)
}
Expand All @@ -296,17 +281,42 @@ export class WSDiscovery {
`, 1, key, token)
}

protected async defineCommands() {
type ScriptData = {
keys: number
readOnly: boolean
}
const scripts: Record<CustomScripts, ScriptData> = {
[CustomScripts.CHANNEL_ADD]: {
keys: 1,
readOnly: false,
},
[CustomScripts.CHANNEL_REMOVE]: {
keys: 1,
readOnly: false,
}
}

for (const [scriptName, scriptData] of Object.entries(scripts)) {
this.redis.defineCommand(scriptName, {
lua: await readFile(resolve(__dirname, '..', LUA, `${scriptName}.${LUA}`), 'utf8'),
numberOfKeys: scriptData.keys,
readOnly: scriptData.readOnly,
})
}
}

protected async migrate() {
const migrations: Array<[string, string[]]> = [
['FT.CREATE', `${this.indexClntChnl} PREFIX 1 ${this.prefixClient} SCHEMA ${CHNL} TAG`.split(' ')],
]

const token = `${Date.now()}_${Math.floor(Math.random() * 99999999)}`
const migrationsKey = this.prefix + __MIGRATIONS
const lockKey = migrationsKey + '_lock'

await this.lock(lockKey, token)

const lockKey = this.getMigrationsLockKey()

await this.lock(lockKey, token, 30)

const migrationsKey = this.getMigrationsKey()
for (let i = 0; i < migrations.length; ++i) {
const migrationId = 'm' + i
const applied = await this.redis.sismember(migrationsKey, migrationId)
Expand Down
25 changes: 25 additions & 0 deletions test/connect.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { equal, rejects } from 'assert/strict'
import { describe, it, before, after } from 'node:test'


import { clearRedis, createRedis, sleep, WSDiscoveryForTests } from './utils'
import { __MIGRATIONS } from '../src/constants'

describe('connect', () => {
const redis = createRedis()
const wsd = new WSDiscoveryForTests({
redis,
})


after(async () => {
await clearRedis(redis, wsd.prefix)
await redis.quit()
})

it('lock is already taken', async () => {
await wsd.lockForTests(1)
await wsd.connect()
})

})
11 changes: 8 additions & 3 deletions test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Redis } from "ioredis"

import { WSDiscovery } from "../src"
import { ID } from "../src/constants"
import { __MIGRATIONS, ID } from "../src/constants"

export const createRedis = () => {
return new Redis({
Expand All @@ -20,8 +20,9 @@ export const clearRedis = async (redis: Redis, prefix: string) => {
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', prefix + '*')

if (keys.length) {
await redis.del(...(keys.filter((k) => !k.startsWith(prefix + '__'))))
const keysForDelete = keys.filter((k) => !k.startsWith(prefix + '__'))
if (keysForDelete.length) {
await redis.del(...keysForDelete)
}
cursor = nextCursor
} while (cursor !== '0')
Expand All @@ -43,4 +44,8 @@ export class WSDiscoveryForTests extends WSDiscovery {
getClientTTL(id: number) {
return this.redis.ttl(this.getClientKey(id))
}

lockForTests(ttl: number) {
return this.lock(this.getMigrationsLockKey(), 'test', ttl)
}
}

0 comments on commit b43093d

Please sign in to comment.