diff --git a/package-lock.json b/package-lock.json index 28ee5dd..d9be0b8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@jackallabs/banshee", - "version": "0.0.0-rc.5", + "version": "0.0.0-rc.6", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@jackallabs/banshee", - "version": "0.0.0-rc.5", + "version": "0.0.0-rc.6", "license": "MIT", "dependencies": { "@cosmjs/amino": "^0.32.3", diff --git a/package.json b/package.json index 5f806d8..a91fb64 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@jackallabs/banshee", - "version": "0.0.0-rc.5", + "version": "0.0.0-rc.6", "description": "Modern problems require modern solutions", "keywords": [], "exports": { diff --git a/src/classes/ibcQueryClient.ts b/src/classes/ibcQueryClient.ts index 86302ce..9117144 100644 --- a/src/classes/ibcQueryClient.ts +++ b/src/classes/ibcQueryClient.ts @@ -3,7 +3,13 @@ import type { CometClient, HttpEndpoint } from '@cosmjs/tendermint-rpc' import { connectComet } from '@cosmjs/tendermint-rpc' import { processExtensions } from '@/utils/extensions' import { WebsocketCore } from '@/classes' -import type { IExtendedStargateClientOptions, IIbcBundle, IIbcQueryClient, IWebsocketCore } from '@/interfaces' +import { + IExtendedStargateClientOptions, IIbcDeafenBundle, + IIbcDisengageBundle, + IIbcEngageBundle, + IIbcQueryClient, + IWebsocketCore +} from '@/interfaces' import type { TPossibleTxEvents, TQueryLibrary } from '@/types' /** @@ -38,12 +44,16 @@ export class IbcQueryClient extends StargateClient imp } async monitor( - connections: IIbcBundle | IIbcBundle[], + connections: IIbcEngageBundle | IIbcEngageBundle[], ): Promise { await this.wsCore.monitor(connections) } - disengage(connections: string | string[]): void { + disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void { this.wsCore.disengage(connections) } + + deafen(connection: IIbcDeafenBundle): void { + this.wsCore.deafen(connection) + } } diff --git a/src/classes/ibcSigningClient.ts b/src/classes/ibcSigningClient.ts index b3ba245..58fb1b0 100644 --- a/src/classes/ibcSigningClient.ts +++ b/src/classes/ibcSigningClient.ts @@ -13,7 +13,13 @@ import type { TQueryLibrary, TTxLibrary } from '@/types' -import type {IExtendedSigningStargateClientOptions, IIbcBundle, IIbcSigningClient, IWebsocketCore} from '@/interfaces' +import { + IExtendedSigningStargateClientOptions, IIbcDeafenBundle, + IIbcDisengageBundle, + IIbcEngageBundle, + IIbcSigningClient, + IWebsocketCore +} from '@/interfaces' /** * @class {IIbcSigningClient} IbcSigningClient @@ -65,15 +71,19 @@ export class IbcSigningClient } async monitor( - connections: IIbcBundle | IIbcBundle[], + connections: IIbcEngageBundle | IIbcEngageBundle[], ): Promise { await this.wsCore.monitor(connections) } - disengage(connections: string | string[]): void { + disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void { this.wsCore.disengage(connections) } + deafen(connection: IIbcDeafenBundle): void { + this.wsCore.deafen(connection) + } + async selfSignAndBroadcast( msgs: DEncodeObject[], options: ISignAndBroadcastOptions = {}, diff --git a/src/classes/websocketCore.ts b/src/classes/websocketCore.ts index 547c5ca..fb42684 100644 --- a/src/classes/websocketCore.ts +++ b/src/classes/websocketCore.ts @@ -1,9 +1,8 @@ import type { CometClient } from '@cosmjs/tendermint-rpc' import { connectComet } from '@cosmjs/tendermint-rpc' -import { makeListener } from '@/utils/misc' import type { Stream } from 'xstream' import type { TPossibleTxEvents } from '@/types' -import type { IIbcBundle, IWebsocketCore } from '@/interfaces' +import { IIbcDeafenBundle, IIbcDisengageBundle, IIbcEngageBundle, IWebsocketCore } from '@/interfaces' export class WebsocketCore implements IWebsocketCore { protected readonly wsConnections: Record @@ -15,48 +14,48 @@ export class WebsocketCore implements IWebsocketCore { } async monitor( - connections: IIbcBundle | IIbcBundle[], + connections: IIbcEngageBundle | IIbcEngageBundle[], ): Promise { try { if (connections instanceof Array) { for (let conn of connections) { - if (!this.wsConnections[conn.chainId]) { - this.wsConnections[conn.chainId] = await connectComet(conn.endpoint) - } - if (!this.activeStreams[conn.chainId]) { - this.activeStreams[conn.chainId] = this.wsConnections[ - conn.chainId - ].subscribeTx() as Stream - } - this.activeStreams[conn.chainId].addListener(makeListener(conn)) + await this.setupMonitoring(conn) } } else { - if (!this.wsConnections[connections.chainId]) { - this.wsConnections[connections.chainId] = await connectComet( - connections.endpoint, - ) - } - if (!this.activeStreams[connections.chainId]) { - this.activeStreams[connections.chainId] = this.wsConnections[ - connections.chainId - ].subscribeTx() as Stream - } - this.activeStreams[connections.chainId].addListener( - makeListener(connections), - ) + await this.setupMonitoring(connections) } } catch (err) { throw err } } - disengage(connections: string | string[]): void { + disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void { if (connections instanceof Array) { for (let conn of connections) { - delete this.wsConnections[conn] + delete this.activeStreams[`${conn.chainId}|${conn.query || 'all'}`] } } else { - delete this.wsConnections[connections] + delete this.activeStreams[`${connections.chainId}|${connections.query || 'all'}`] + } + } + + deafen(connection: IIbcDeafenBundle): void { + this.activeStreams[`${connection.chainId}|${connection.query || 'all'}`].removeListener(connection.listener) + } + + protected async setupMonitoring(conn: IIbcEngageBundle) { + if (!this.wsConnections[conn.chainId]) { + this.wsConnections[conn.chainId] = await connectComet(conn.endpoint) + } + const client = this.wsConnections[conn.chainId] + const streamId = `${conn.chainId}|${conn.query || 'all'}` + if (!this.activeStreams[streamId]) { + if (conn.query) { + this.activeStreams[streamId] = client.subscribeTx(conn.query) as Stream + } else { + this.activeStreams[streamId] = client.subscribeTx() as Stream + } } + this.activeStreams[streamId].addListener(conn.listener) } } diff --git a/src/index.ts b/src/index.ts index 6d7fdee..e8b0045 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,5 @@ export * from '@/classes' export * from '@/interfaces' export * from '@/types' + +export { makeListener } from '@/utils/misc' diff --git a/src/interfaces/IIbcBundle.ts b/src/interfaces/IIbcBundle.ts index 9ed0c08..0607be9 100644 --- a/src/interfaces/IIbcBundle.ts +++ b/src/interfaces/IIbcBundle.ts @@ -1,13 +1,50 @@ import type { TCurrentTxEvent, TPossibleTxEvents } from '@/types' +import type { IListener } from '@/interfaces/IListener' /** - * @interface IIbcBundle + * @interface IIbcEngageBundle * @property {string} chainId * @property {string} endpoint - * @property {TCurrentTxEvent[]} feed + * @property {string} [query] + * @property {TCurrentTxEvent[]} feed + * @property {IListener} listener */ -export interface IIbcBundle { +export interface IIbcEngageBundle { chainId: string endpoint: string + query?: string + feed: TCurrentTxEvent[] + listener: IListener +} + +/** + * @interface IIbcDisengageBundle + * @property {string} chainId + * @property {string} [query] + */ +export interface IIbcDisengageBundle { + chainId: string + query?: string +} + +/** + * @interface IIbcDeafenBundle + * @property {string} chainId + * @property {string} [query] + * @property {IListener} listener + */ +export interface IIbcDeafenBundle { + chainId: string + query?: string + listener: IListener +} + +/** + * @interface IIbcMakeListenerBundle + * @property {string} chainId + * @property {TCurrentTxEvent[]} feed + */ +export interface IIbcMakeListenerBundle { + chainId: string feed: TCurrentTxEvent[] } diff --git a/src/interfaces/classes/IWebsocketCore.ts b/src/interfaces/classes/IWebsocketCore.ts index 90e6660..00901a4 100644 --- a/src/interfaces/classes/IWebsocketCore.ts +++ b/src/interfaces/classes/IWebsocketCore.ts @@ -1,4 +1,4 @@ -import type {IIbcBundle} from '@/interfaces' +import { IIbcDeafenBundle, IIbcDisengageBundle, IIbcEngageBundle } from '@/interfaces' import type {TPossibleTxEvents} from '@/types' /** @@ -9,16 +9,22 @@ import type {TPossibleTxEvents} from '@/types' export interface IWebsocketCore { /** * @function monitor - * @param {IIbcBundle | IIbcBundle[]} connections + * @param {IIbcEngageBundle | IIbcEngageBundle[]} connections * @returns {Promise} */ monitor( - connections: IIbcBundle | IIbcBundle[], + connections: IIbcEngageBundle | IIbcEngageBundle[], ): Promise /** * @function disengage - * @param {string | string[]} connections + * @param {IIbcDisengageBundle | IIbcDisengageBundle[]} connections */ - disengage(connections: string | string[]): void + disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void + + /** + * @function deafen + * @param {IIbcDeafenBundle} connection + */ + deafen(connection: IIbcDeafenBundle): void } diff --git a/src/utils/misc.ts b/src/utils/misc.ts index d027521..267e5b4 100644 --- a/src/utils/misc.ts +++ b/src/utils/misc.ts @@ -1,5 +1,5 @@ import type { TPossibleTxEvents } from '@/types' -import type { IIbcBundle, IListener } from '@/interfaces' +import { IIbcMakeListenerBundle, IListener } from '@/interfaces' const oneSecondMs = 1000 @@ -47,11 +47,11 @@ export function secondToMS(seconds: number): number { /** * Build Listener instance for attaching to websocket. - * @param {IIbcBundle} bundle + * @param {IIbcMakeListenerBundle} bundle * @returns {IListener} */ export function makeListener( - bundle: IIbcBundle, + bundle: IIbcMakeListenerBundle, ): IListener { return { next(value: T): void {