Skip to content

Commit

Permalink
rc6
Browse files Browse the repository at this point in the history
  • Loading branch information
karnthis committed Apr 23, 2024
1 parent 2f10d23 commit af24059
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 48 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@jackallabs/banshee",
"version": "0.0.0-rc.5",
"version": "0.0.0-rc.6",
"description": "Modern problems require modern solutions",
"keywords": [],
"exports": {
Expand Down
16 changes: 13 additions & 3 deletions src/classes/ibcQueryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ import type { CometClient, HttpEndpoint } from '@cosmjs/tendermint-rpc'
import { connectComet } from '@cosmjs/tendermint-rpc'
import { processExtensions } from '@/utils/extensions'
import { WebsocketCore } from '@/classes'
import type { IExtendedStargateClientOptions, IIbcBundle, IIbcQueryClient, IWebsocketCore } from '@/interfaces'
import {
IExtendedStargateClientOptions, IIbcDeafenBundle,
IIbcDisengageBundle,
IIbcEngageBundle,
IIbcQueryClient,
IWebsocketCore
} from '@/interfaces'
import type { TPossibleTxEvents, TQueryLibrary } from '@/types'

/**
Expand Down Expand Up @@ -38,12 +44,16 @@ export class IbcQueryClient<TQ extends TQueryLibrary> extends StargateClient imp
}

async monitor<T extends TPossibleTxEvents>(
connections: IIbcBundle<T> | IIbcBundle<T>[],
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void> {
await this.wsCore.monitor(connections)
}

disengage(connections: string | string[]): void {
disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
this.wsCore.disengage(connections)
}

deafen(connection: IIbcDeafenBundle): void {
this.wsCore.deafen(connection)
}
}
16 changes: 13 additions & 3 deletions src/classes/ibcSigningClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ import type {
TQueryLibrary,
TTxLibrary
} from '@/types'
import type {IExtendedSigningStargateClientOptions, IIbcBundle, IIbcSigningClient, IWebsocketCore} from '@/interfaces'
import {
IExtendedSigningStargateClientOptions, IIbcDeafenBundle,
IIbcDisengageBundle,
IIbcEngageBundle,
IIbcSigningClient,
IWebsocketCore
} from '@/interfaces'

/**
* @class {IIbcSigningClient} IbcSigningClient
Expand Down Expand Up @@ -65,15 +71,19 @@ export class IbcSigningClient<TQ extends TQueryLibrary, TT extends TTxLibrary>
}

async monitor<T extends TPossibleTxEvents>(
connections: IIbcBundle<T> | IIbcBundle<T>[],
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void> {
await this.wsCore.monitor(connections)
}

disengage(connections: string | string[]): void {
disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
this.wsCore.disengage(connections)
}

deafen(connection: IIbcDeafenBundle): void {
this.wsCore.deafen(connection)
}

async selfSignAndBroadcast(
msgs: DEncodeObject[],
options: ISignAndBroadcastOptions = {},
Expand Down
55 changes: 27 additions & 28 deletions src/classes/websocketCore.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { CometClient } from '@cosmjs/tendermint-rpc'
import { connectComet } from '@cosmjs/tendermint-rpc'
import { makeListener } from '@/utils/misc'
import type { Stream } from 'xstream'
import type { TPossibleTxEvents } from '@/types'
import type { IIbcBundle, IWebsocketCore } from '@/interfaces'
import { IIbcDeafenBundle, IIbcDisengageBundle, IIbcEngageBundle, IWebsocketCore } from '@/interfaces'

export class WebsocketCore implements IWebsocketCore {
protected readonly wsConnections: Record<string, CometClient>
Expand All @@ -15,48 +14,48 @@ export class WebsocketCore implements IWebsocketCore {
}

async monitor<T extends TPossibleTxEvents>(
connections: IIbcBundle<T> | IIbcBundle<T>[],
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void> {
try {
if (connections instanceof Array) {
for (let conn of connections) {
if (!this.wsConnections[conn.chainId]) {
this.wsConnections[conn.chainId] = await connectComet(conn.endpoint)
}
if (!this.activeStreams[conn.chainId]) {
this.activeStreams[conn.chainId] = this.wsConnections[
conn.chainId
].subscribeTx() as Stream<TPossibleTxEvents>
}
this.activeStreams[conn.chainId].addListener(makeListener<T>(conn))
await this.setupMonitoring<T>(conn)
}
} else {
if (!this.wsConnections[connections.chainId]) {
this.wsConnections[connections.chainId] = await connectComet(
connections.endpoint,
)
}
if (!this.activeStreams[connections.chainId]) {
this.activeStreams[connections.chainId] = this.wsConnections[
connections.chainId
].subscribeTx() as Stream<TPossibleTxEvents>
}
this.activeStreams[connections.chainId].addListener(
makeListener<T>(connections),
)
await this.setupMonitoring<T>(connections)
}
} catch (err) {
throw err
}
}

disengage(connections: string | string[]): void {
disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void {
if (connections instanceof Array) {
for (let conn of connections) {
delete this.wsConnections[conn]
delete this.activeStreams[`${conn.chainId}|${conn.query || 'all'}`]
}
} else {
delete this.wsConnections[connections]
delete this.activeStreams[`${connections.chainId}|${connections.query || 'all'}`]
}
}

deafen(connection: IIbcDeafenBundle): void {
this.activeStreams[`${connection.chainId}|${connection.query || 'all'}`].removeListener(connection.listener)
}

protected async setupMonitoring<T extends TPossibleTxEvents>(conn: IIbcEngageBundle<T>) {
if (!this.wsConnections[conn.chainId]) {
this.wsConnections[conn.chainId] = await connectComet(conn.endpoint)
}
const client = this.wsConnections[conn.chainId]
const streamId = `${conn.chainId}|${conn.query || 'all'}`
if (!this.activeStreams[streamId]) {
if (conn.query) {
this.activeStreams[streamId] = client.subscribeTx(conn.query) as Stream<TPossibleTxEvents>
} else {
this.activeStreams[streamId] = client.subscribeTx() as Stream<TPossibleTxEvents>
}
}
this.activeStreams[streamId].addListener(conn.listener)
}
}
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from '@/classes'
export * from '@/interfaces'
export * from '@/types'

export { makeListener } from '@/utils/misc'
43 changes: 40 additions & 3 deletions src/interfaces/IIbcBundle.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
import type { TCurrentTxEvent, TPossibleTxEvents } from '@/types'
import type { IListener } from '@/interfaces/IListener'

/**
* @interface IIbcBundle
* @interface IIbcEngageBundle
* @property {string} chainId
* @property {string} endpoint
* @property {TCurrentTxEvent[]} feed
* @property {string} [query]
* @property {TCurrentTxEvent<T>[]} feed
* @property {IListener<TPossibleTxEvents>} listener
*/
export interface IIbcBundle<T extends TPossibleTxEvents> {
export interface IIbcEngageBundle<T extends TPossibleTxEvents> {
chainId: string
endpoint: string
query?: string
feed: TCurrentTxEvent<T>[]
listener: IListener<TPossibleTxEvents>
}

/**
* @interface IIbcDisengageBundle
* @property {string} chainId
* @property {string} [query]
*/
export interface IIbcDisengageBundle {
chainId: string
query?: string
}

/**
* @interface IIbcDeafenBundle
* @property {string} chainId
* @property {string} [query]
* @property {IListener<TPossibleTxEvents>} listener
*/
export interface IIbcDeafenBundle {
chainId: string
query?: string
listener: IListener<TPossibleTxEvents>
}

/**
* @interface IIbcMakeListenerBundle
* @property {string} chainId
* @property {TCurrentTxEvent<T>[]} feed
*/
export interface IIbcMakeListenerBundle<T extends TPossibleTxEvents> {
chainId: string
feed: TCurrentTxEvent<T>[]
}
16 changes: 11 additions & 5 deletions src/interfaces/classes/IWebsocketCore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type {IIbcBundle} from '@/interfaces'
import { IIbcDeafenBundle, IIbcDisengageBundle, IIbcEngageBundle } from '@/interfaces'
import type {TPossibleTxEvents} from '@/types'

/**
Expand All @@ -9,16 +9,22 @@ import type {TPossibleTxEvents} from '@/types'
export interface IWebsocketCore {
/**
* @function monitor
* @param {IIbcBundle<T> | IIbcBundle<T>[]} connections
* @param {IIbcEngageBundle<T> | IIbcEngageBundle<T>[]} connections
* @returns {Promise<void>}
*/
monitor<T extends TPossibleTxEvents>(
connections: IIbcBundle<T> | IIbcBundle<T>[],
connections: IIbcEngageBundle<T> | IIbcEngageBundle<T>[],
): Promise<void>

/**
* @function disengage
* @param {string | string[]} connections
* @param {IIbcDisengageBundle | IIbcDisengageBundle[]} connections
*/
disengage(connections: string | string[]): void
disengage(connections: IIbcDisengageBundle | IIbcDisengageBundle[]): void

/**
* @function deafen
* @param {IIbcDeafenBundle} connection
*/
deafen(connection: IIbcDeafenBundle): void
}
6 changes: 3 additions & 3 deletions src/utils/misc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { TPossibleTxEvents } from '@/types'
import type { IIbcBundle, IListener } from '@/interfaces'
import { IIbcMakeListenerBundle, IListener } from '@/interfaces'

const oneSecondMs = 1000

Expand Down Expand Up @@ -47,11 +47,11 @@ export function secondToMS(seconds: number): number {

/**
* Build Listener instance for attaching to websocket.
* @param {IIbcBundle<T>} bundle
* @param {IIbcMakeListenerBundle<T>} bundle
* @returns {IListener<TPossibleTxEvents>}
*/
export function makeListener<T extends TPossibleTxEvents>(
bundle: IIbcBundle<T>,
bundle: IIbcMakeListenerBundle<T>,
): IListener<TPossibleTxEvents> {
return {
next(value: T): void {
Expand Down

0 comments on commit af24059

Please sign in to comment.