From 7dd89a08c27c769089e6ede5fd8c13b92bac238d Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Fri, 28 Jul 2023 12:18:01 +0530 Subject: [PATCH] feat: add ability to run commands on the RedisManager class --- src/connections/abstract_connection.ts | 8 + src/connections/io_methods.ts | 679 ++++++++++++++++++-- src/connections/redis_cluster_connection.ts | 12 +- src/connections/redis_connection.ts | 23 +- src/redis_manager.ts | 14 +- src/types/main.ts | 26 +- tests/redis_manager.spec.ts | 17 + 7 files changed, 710 insertions(+), 69 deletions(-) diff --git a/src/connections/abstract_connection.ts b/src/connections/abstract_connection.ts index 96b53a7..4eafa73 100644 --- a/src/connections/abstract_connection.ts +++ b/src/connections/abstract_connection.ts @@ -73,6 +73,14 @@ export abstract class AbstractConnection extends Even return this.ioSubscriberConnection?.status } + /** + * Get the number of commands queued in automatic pipelines. + * This is not available (and returns 0) until the cluster is connected and slots information have been received. + */ + get autoPipelineQueueSize() { + return this.ioConnection.autoPipelineQueueSize + } + /** * Parent class must implement makeSubscriberConnection */ diff --git a/src/connections/io_methods.ts b/src/connections/io_methods.ts index b22cbf4..7000a44 100644 --- a/src/connections/io_methods.ts +++ b/src/connections/io_methods.ts @@ -7,50 +7,647 @@ * file that was distributed with this source code. */ -import { Redis } from 'ioredis' +import type { Redis, Cluster } from 'ioredis' /** - * Returns all method names for a given class + * Base methods are shared by a regular Redis connection and + * the cluster connection. + * https://redis.github.io/ioredis/classes/Redis.html */ -function getAllMethodNames(obj: any) { - let methods = new Set() - while ((obj = Reflect.getPrototypeOf(obj))) { - let keys = Reflect.ownKeys(obj) - keys.forEach((k) => methods.add(k)) - } - return [...methods] as string[] -} - -const ignoredMethods = [ - 'constructor', - 'status', - 'connect', - 'disconnect', - 'duplicate', - 'quit', - '__defineGetter__', - '__defineSetter__', - 'hasOwnProperty', - '__lookupGetter__', - '__lookupSetter__', - 'isPrototypeOf', - 'propertyIsEnumerable', - 'toString', - 'valueOf', - '__proto__', - 'defineCommand', - 'toLocaleString', - // PubSub methods - 'subscribe', - 'unsubscribe', - 'psubscribe', - 'punsubscribe', - 'publish', -] +export const baseMethods = [ + 'acl', + 'aclBuffer', + 'addBuiltinCommand', + 'append', + 'asking', + 'auth', + 'bgrewriteaof', + 'bgrewriteaofBuffer', + 'bgsave', + 'bitcount', + 'bitfield', + 'bitfield_ro', + 'bitop', + 'bitpos', + 'blmove', + 'blmoveBuffer', + 'blmpop', + 'blmpopBuffer', + 'blpop', + 'blpopBuffer', + 'brpop', + 'brpopBuffer', + 'brpoplpush', + 'brpoplpushBuffer', + 'bzmpop', + 'bzpopmax', + 'bzpopmaxBuffer', + 'bzpopmin', + 'bzpopminBuffer', + 'call', + 'callBuffer', + 'client', + 'clientBuffer', + 'cluster', + 'command', + 'config', + 'copy', + 'createBuiltinCommand', + 'dbsize', + 'debug', + 'decr', + 'decrby', + 'del', + 'discard', + 'dump', + 'dumpBuffer', + 'echo', + 'echoBuffer', + 'eval', + 'eval_ro', + 'evalsha', + 'evalsha_ro', + 'exec', + 'exists', + 'expire', + 'expireat', + 'expiretime', + 'failover', + 'fcall', + 'fcall_ro', + 'flushall', + 'flushdb', + 'function', + 'functionBuffer', + 'geoadd', + 'geodist', + 'geodistBuffer', + 'geohash', + 'geohashBuffer', + 'geopos', + 'georadius', + 'georadius_ro', + 'georadiusbymember', + 'georadiusbymember_ro', + 'geosearch', + 'geosearchstore', + 'get', + 'getBuffer', + 'getBuiltinCommands', + 'getbit', + 'getdel', + 'getdelBuffer', + 'getex', + 'getexBuffer', + 'getrange', + 'getrangeBuffer', + 'getset', + 'getsetBuffer', + 'hdel', + 'hello', + 'hexists', + 'hget', + 'hgetBuffer', + 'hgetall', + 'hgetallBuffer', + 'hincrby', + 'hincrbyfloat', + 'hincrbyfloatBuffer', + 'hkeys', + 'hkeysBuffer', + 'hlen', + 'hmget', + 'hmgetBuffer', + 'hmset', + 'hrandfield', + 'hrandfieldBuffer', + 'hscan', + 'hscanBuffer', + 'hscanBufferStream', + 'hscanStream', + 'hset', + 'hsetnx', + 'hstrlen', + 'hvals', + 'hvalsBuffer', + 'incr', + 'incrby', + 'incrbyfloat', + 'info', + 'keys', + 'keysBuffer', + 'lastsave', + 'latency', + 'lcs', + 'lindex', + 'lindexBuffer', + 'linsert', + 'llen', + 'lmove', + 'lmoveBuffer', + 'lmpop', + 'lmpopBuffer', + 'lolwut', + 'lpop', + 'lpopBuffer', + 'lpos', + 'lpush', + 'lpushx', + 'lrange', + 'lrangeBuffer', + 'lrem', + 'lset', + 'ltrim', + 'memory', + 'mget', + 'mgetBuffer', + 'migrate', + 'module', + 'move', + 'mset', + 'msetnx', + 'multi', + 'object', + 'persist', + 'pexpire', + 'pexpireat', + 'pexpiretime', + 'pfadd', + 'pfcount', + 'pfdebug', + 'pfmerge', + 'pfselftest', + 'ping', + 'pingBuffer', + 'pipeline', + 'psetex', + 'psync', + 'pttl', + 'pubsub', + 'randomkey', + 'randomkeyBuffer', + 'readonly', + 'readwrite', + 'rename', + 'renamenx', + 'replconf', + 'replicaof', + 'reset', + 'restore', + 'restore-asking', + 'role', + 'rpop', + 'rpopBuffer', + 'rpoplpush', + 'rpoplpushBuffer', + 'rpush', + 'rpushx', + 'sadd', + 'save', + 'scan', + 'scanBuffer', + 'scard', + 'script', + 'sdiff', + 'sdiffBuffer', + 'sdiffstore', + 'select', + 'set', + 'setBuffer', + 'setbit', + 'setex', + 'setnx', + 'setrange', + 'shutdown', + 'sinter', + 'sinterBuffer', + 'sintercard', + 'sinterstore', + 'sismember', + 'slaveof', + 'slowlog', + 'smembers', + 'smembersBuffer', + 'smismember', + 'smove', + 'sort', + 'sort_ro', + 'spop', + 'spopBuffer', + 'spublish', + 'srandmember', + 'srandmemberBuffer', + 'srem', + 'sscan', + 'sscanBuffer', + 'sscanBufferStream', + 'sscanStream', + 'strlen', + 'substr', + 'sunion', + 'sunionBuffer', + 'sunionstore', + 'swapdb', + 'sync', + 'time', + 'touch', + 'ttl', + 'type', + 'unlink', + 'unwatch', + 'wait', + 'watch', + 'xack', + 'xadd', + 'xaddBuffer', + 'xautoclaim', + 'xclaim', + 'xdel', + 'xgroup', + 'xinfo', + 'xlen', + 'xpending', + 'xrange', + 'xrangeBuffer', + 'xread', + 'xreadBuffer', + 'xreadgroup', + 'xrevrange', + 'xrevrangeBuffer', + 'xsetid', + 'xtrim', + 'zadd', + 'zaddBuffer', + 'zcard', + 'zcount', + 'zdiff', + 'zdiffBuffer', + 'zdiffstore', + 'zincrby', + 'zincrbyBuffer', + 'zinter', + 'zinterBuffer', + 'zintercard', + 'zinterstore', + 'zlexcount', + 'zmpop', + 'zmscore', + 'zmscoreBuffer', + 'zpopmax', + 'zpopmaxBuffer', + 'zpopmin', + 'zpopminBuffer', + 'zrandmember', + 'zrandmemberBuffer', + 'zrange', + 'zrangeBuffer', + 'zrangebylex', + 'zrangebylexBuffer', + 'zrangebyscore', + 'zrangebyscoreBuffer', + 'zrangestore', + 'zrank', + 'zrem', + 'zremrangebylex', + 'zremrangebyrank', + 'zremrangebyscore', + 'zrevrange', + 'zrevrangeBuffer', + 'zrevrangebylex', + 'zrevrangebylexBuffer', + 'zrevrangebyscore', + 'zrevrangebyscoreBuffer', + 'zrevrank', + 'zscan', + 'zscanBuffer', + 'zscanBufferStream', + 'zscanStream', + 'zscore', + 'zscoreBuffer', + 'zunion', + 'zunionBuffer', + 'zunionstore', +] satisfies (keyof Cluster)[] /** - * List of methods on Redis class + * Methods available on a non-cluster Redis + * connection */ -export const ioMethods = getAllMethodNames(Redis.prototype).filter( - (method) => !ignoredMethods.includes(method) -) as string[] +export const redisMethods = [ + 'acl', + 'aclBuffer', + 'addBuiltinCommand', + 'append', + 'asking', + 'auth', + 'bgrewriteaof', + 'bgrewriteaofBuffer', + 'bgsave', + 'bitcount', + 'bitfield', + 'bitfield_ro', + 'bitop', + 'bitpos', + 'blmove', + 'blmoveBuffer', + 'blmpop', + 'blmpopBuffer', + 'blpop', + 'blpopBuffer', + 'brpop', + 'brpopBuffer', + 'brpoplpush', + 'brpoplpushBuffer', + 'bzmpop', + 'bzpopmax', + 'bzpopmaxBuffer', + 'bzpopmin', + 'bzpopminBuffer', + 'call', + 'callBuffer', + 'client', + 'clientBuffer', + 'cluster', + 'command', + 'config', + 'copy', + 'createBuiltinCommand', + 'dbsize', + 'debug', + 'decr', + 'decrby', + 'del', + 'discard', + 'dump', + 'dumpBuffer', + 'echo', + 'echoBuffer', + 'eval', + 'eval_ro', + 'evalsha', + 'evalsha_ro', + 'exec', + 'exists', + 'expire', + 'expireat', + 'expiretime', + 'failover', + 'fcall', + 'fcall_ro', + 'flushall', + 'flushdb', + 'function', + 'functionBuffer', + 'geoadd', + 'geodist', + 'geodistBuffer', + 'geohash', + 'geohashBuffer', + 'geopos', + 'georadius', + 'georadius_ro', + 'georadiusbymember', + 'georadiusbymember_ro', + 'geosearch', + 'geosearchstore', + 'get', + 'getBuffer', + 'getBuiltinCommands', + 'getbit', + 'getdel', + 'getdelBuffer', + 'getex', + 'getexBuffer', + 'getrange', + 'getrangeBuffer', + 'getset', + 'getsetBuffer', + 'hdel', + 'hello', + 'hexists', + 'hget', + 'hgetBuffer', + 'hgetall', + 'hgetallBuffer', + 'hincrby', + 'hincrbyfloat', + 'hincrbyfloatBuffer', + 'hkeys', + 'hkeysBuffer', + 'hlen', + 'hmget', + 'hmgetBuffer', + 'hmset', + 'hrandfield', + 'hrandfieldBuffer', + 'hscan', + 'hscanBuffer', + 'hscanBufferStream', + 'hscanStream', + 'hset', + 'hsetnx', + 'hstrlen', + 'hvals', + 'hvalsBuffer', + 'incr', + 'incrby', + 'incrbyfloat', + 'info', + 'keys', + 'keysBuffer', + 'lastsave', + 'latency', + 'lcs', + 'lindex', + 'lindexBuffer', + 'linsert', + 'llen', + 'lmove', + 'lmoveBuffer', + 'lmpop', + 'lmpopBuffer', + 'lolwut', + 'lpop', + 'lpopBuffer', + 'lpos', + 'lpush', + 'lpushx', + 'lrange', + 'lrangeBuffer', + 'lrem', + 'lset', + 'ltrim', + 'memory', + 'mget', + 'mgetBuffer', + 'migrate', + 'module', + 'move', + 'mset', + 'msetnx', + 'multi', + 'object', + 'persist', + 'pexpire', + 'pexpireat', + 'pexpiretime', + 'pfadd', + 'pfcount', + 'pfdebug', + 'pfmerge', + 'pfselftest', + 'ping', + 'pingBuffer', + 'pipeline', + 'psetex', + 'psync', + 'pttl', + 'pubsub', + 'randomkey', + 'randomkeyBuffer', + 'readonly', + 'readwrite', + 'rename', + 'renamenx', + 'replconf', + 'replicaof', + 'reset', + 'restore', + 'restore-asking', + 'role', + 'rpop', + 'rpopBuffer', + 'rpoplpush', + 'rpoplpushBuffer', + 'rpush', + 'rpushx', + 'sadd', + 'save', + 'scan', + 'scanBuffer', + 'scard', + 'script', + 'sdiff', + 'sdiffBuffer', + 'sdiffstore', + 'select', + 'set', + 'setBuffer', + 'setbit', + 'setex', + 'setnx', + 'setrange', + 'shutdown', + 'sinter', + 'sinterBuffer', + 'sintercard', + 'sinterstore', + 'sismember', + 'slaveof', + 'slowlog', + 'smembers', + 'smembersBuffer', + 'smismember', + 'smove', + 'sort', + 'sort_ro', + 'spop', + 'spopBuffer', + 'spublish', + 'srandmember', + 'srandmemberBuffer', + 'srem', + 'sscan', + 'sscanBuffer', + 'sscanBufferStream', + 'sscanStream', + 'strlen', + 'substr', + 'sunion', + 'sunionBuffer', + 'sunionstore', + 'swapdb', + 'sync', + 'time', + 'touch', + 'ttl', + 'type', + 'unlink', + 'unwatch', + 'wait', + 'watch', + 'xack', + 'xadd', + 'xaddBuffer', + 'xautoclaim', + 'xclaim', + 'xdel', + 'xgroup', + 'xinfo', + 'xlen', + 'xpending', + 'xrange', + 'xrangeBuffer', + 'xread', + 'xreadBuffer', + 'xreadgroup', + 'xrevrange', + 'xrevrangeBuffer', + 'xsetid', + 'xtrim', + 'zadd', + 'zaddBuffer', + 'zcard', + 'zcount', + 'zdiff', + 'zdiffBuffer', + 'zdiffstore', + 'zincrby', + 'zincrbyBuffer', + 'zinter', + 'zinterBuffer', + 'zintercard', + 'zinterstore', + 'zlexcount', + 'zmpop', + 'zmscore', + 'zmscoreBuffer', + 'zpopmax', + 'zpopmaxBuffer', + 'zpopmin', + 'zpopminBuffer', + 'zrandmember', + 'zrandmemberBuffer', + 'zrange', + 'zrangeBuffer', + 'zrangebylex', + 'zrangebylexBuffer', + 'zrangebyscore', + 'zrangebyscoreBuffer', + 'zrangestore', + 'zrank', + 'zrem', + 'zremrangebylex', + 'zremrangebyrank', + 'zremrangebyscore', + 'zrevrange', + 'zrevrangeBuffer', + 'zrevrangebylex', + 'zrevrangebylexBuffer', + 'zrevrangebyscore', + 'zrevrangebyscoreBuffer', + 'zrevrank', + 'zscan', + 'zscanBuffer', + 'zscanBufferStream', + 'zscanStream', + 'zscore', + 'zscoreBuffer', + 'zunion', + 'zunionBuffer', + 'zunionstore', + 'end', + 'monitor', + 'scanBufferStream', + 'scanStream', +] satisfies (keyof Redis)[] diff --git a/src/connections/redis_cluster_connection.ts b/src/connections/redis_cluster_connection.ts index 60c2699..7187240 100644 --- a/src/connections/redis_cluster_connection.ts +++ b/src/connections/redis_cluster_connection.ts @@ -10,9 +10,9 @@ import Redis, { type Cluster, type NodeRole } from 'ioredis' import debug from '../debug.js' -import { ioMethods } from './io_methods.js' +import { baseMethods } from './io_methods.js' import { AbstractConnection } from './abstract_connection.js' -import type { IORedisCommands, RedisClusterConnectionConfig } from '../types/main.js' +import type { IORedisBaseCommands, RedisClusterConnectionConfig } from '../types/main.js' /** * Redis cluster connection exposes the API to run Redis commands using `ioredis` as the @@ -22,6 +22,10 @@ import type { IORedisCommands, RedisClusterConnectionConfig } from '../types/mai export class RedisClusterConnection extends AbstractConnection { #config: RedisClusterConnectionConfig + get slots() { + return this.ioConnection.slots + } + constructor(connectionName: string, config: RedisClusterConnectionConfig) { debug('creating cluster connection %s: %O', connectionName, config) super(connectionName) @@ -59,8 +63,8 @@ export class RedisClusterConnection extends AbstractConnection { * Adding IORedis methods dynamically on the RedisClusterConnection * class and also extending its TypeScript types */ -export interface RedisClusterConnection extends IORedisCommands {} -ioMethods.forEach((method) => { +export interface RedisClusterConnection extends IORedisBaseCommands {} +baseMethods.forEach((method) => { ;(RedisClusterConnection.prototype as any)[method] = function redisConnectionProxyFn( ...args: any[] ) { diff --git a/src/connections/redis_connection.ts b/src/connections/redis_connection.ts index 3bf534f..8c3337f 100644 --- a/src/connections/redis_connection.ts +++ b/src/connections/redis_connection.ts @@ -10,9 +10,9 @@ import { Redis, RedisOptions } from 'ioredis' import debug from '../debug.js' -import { ioMethods } from './io_methods.js' +import { redisMethods } from './io_methods.js' import { AbstractConnection } from './abstract_connection.js' -import { IORedisCommands, RedisConnectionConfig } from '../types/main.js' +import { IORedisConnectionCommands, RedisConnectionConfig } from '../types/main.js' /** * Redis connection exposes the API to run Redis commands using `ioredis` as the @@ -23,6 +23,21 @@ import { IORedisCommands, RedisConnectionConfig } from '../types/main.js' export class RedisConnection extends AbstractConnection { #config: RedisOptions + /** + * Returns the connection mode + */ + get mode() { + return this.ioConnection.mode + } + + /** + * Returns the connection mode for the subscriber + * connection + */ + get subscribeMode() { + return this.ioSubscriberConnection?.mode + } + constructor(connectionName: string, config: RedisConnectionConfig) { debug('creating connection %s: %O', connectionName, config) super(connectionName) @@ -57,8 +72,8 @@ export class RedisConnection extends AbstractConnection { * Adding IORedis methods dynamically on the RedisConnection * class and also extending its TypeScript types */ -export interface RedisConnection extends IORedisCommands {} -ioMethods.forEach((method) => { +export interface RedisConnection extends IORedisConnectionCommands {} +redisMethods.forEach((method) => { ;(RedisConnection.prototype as any)[method] = function redisConnectionProxyFn(...args: any[]) { return this.ioConnection[method](...args) } diff --git a/src/redis_manager.ts b/src/redis_manager.ts index 6f0306d..a6b7a0d 100644 --- a/src/redis_manager.ts +++ b/src/redis_manager.ts @@ -10,9 +10,10 @@ import { RuntimeException } from '@poppinss/utils' import debug from './debug.js' +import { baseMethods } from './connections/io_methods.js' import RedisConnection from './connections/redis_connection.js' import RedisClusterConnection from './connections/redis_cluster_connection.js' -import type { GetConnectionType, RedisConnectionsList } from './types/main.js' +import type { GetConnectionType, IORedisBaseCommands, RedisConnectionsList } from './types/main.js' /** * Redis Manager exposes the API to manage multiple redis connections @@ -20,7 +21,7 @@ import type { GetConnectionType, RedisConnectionsList } from './types/main.js' * * All connections are long-lived until they are closed explictly */ -export default class RedisManager { +class RedisManager { /** * User provided config */ @@ -137,3 +138,12 @@ export default class RedisManager await Promise.all(Object.keys(this.activeConnections).map((name) => this.disconnect(name))) } } + +interface RedisManager extends IORedisBaseCommands {} +baseMethods.forEach((method) => { + ;(RedisManager.prototype as any)[method] = function redisConnectionProxyFn(...args: any[]) { + return this.connection()[method](...args) + } +}) + +export default RedisManager diff --git a/src/types/main.ts b/src/types/main.ts index 8bef6be..17b5bd5 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -7,10 +7,10 @@ * file that was distributed with this source code. */ -import type { EventEmitter } from 'node:events' -import type { Redis as IoRedis, RedisOptions, ClusterOptions } from 'ioredis' +import type { Redis, Cluster, RedisOptions, ClusterOptions } from 'ioredis' import type RedisManager from '../redis_manager.js' +import type { baseMethods, redisMethods } from '../connections/io_methods.js' import type RedisConnection from '../connections/redis_connection.js' import type RedisClusterConnection from '../connections/redis_cluster_connection.js' @@ -37,22 +37,12 @@ export type HealthReportNode = { * List of commands on the IORedis. We omit their internal events and pub/sub * handlers, since we have our own. */ -export type IORedisCommands = Omit< - IoRedis, - | 'Promise' - | 'status' - | 'connect' - | 'disconnect' - | 'duplicate' - | 'subscribe' - | 'unsubscribe' - | 'psubscribe' - | 'punsubscribe' - | 'quit' - | 'publish' - | 'defineCommand' - | keyof EventEmitter -> +export type IORedisBaseCommands = { + [K in (typeof baseMethods)[number]]: Cluster[K] +} +export type IORedisConnectionCommands = { + [K in (typeof redisMethods)[number]]: Redis[K] +} /** * Configuration accepted by the redis connection. It is same diff --git a/tests/redis_manager.spec.ts b/tests/redis_manager.spec.ts index 2c8d411..f0480c8 100644 --- a/tests/redis_manager.spec.ts +++ b/tests/redis_manager.spec.ts @@ -45,6 +45,23 @@ test.group('Redis Manager', () => { redis.connection('foo') }).throws('Redis connection "foo" is not defined') + test('run redis commands from the manager', async ({ assert }) => { + const redis = new RedisManagerFactory({ + connection: 'primary', + connections: { + primary: { host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }, + }, + }).create() + + await redis.set('greeting', 'hello-world') + const greeting = await redis.get('greeting') + + assert.equal(greeting, 'hello-world') + + await redis.del('greeting') + await redis.quit('primary') + }) + test('run redis commands using default connection', async ({ assert }) => { const redis = new RedisManagerFactory({ connection: 'primary',