Skip to content

Commit

Permalink
refactor(rosenet-node): extract RoseNet direct protocol handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mkermani144 committed Sep 11, 2024
1 parent 8883026 commit 7addf22
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 46 deletions.
53 changes: 7 additions & 46 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { PeerId } from '@libp2p/interface';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { tcp } from '@libp2p/tcp';

import { pipe } from 'it-pipe';
import { createLibp2p } from 'libp2p';

import {
Expand All @@ -17,28 +16,26 @@ import {
privateKeyToPeerId,
} from '@rosen-bridge/rosenet-utils';

import {
handleIncomingMessageFactory,
sendMessageFactory,
} from './rosenet-direct';

import RoseNetNodeContext from './context/RoseNetNodeContext';

import restartRelayDiscovery from './libp2p/restart-relay-discovery';

import addressService from './address/address-service';

import { decode } from './utils/codec';
import sample from './utils/sample';

import RoseNetNodeError from './errors/RoseNetNodeError';

import {
ACK_BYTE,
DEFAULT_NODE_PORT,
RELAYS_COUNT_TO_CONNECT,
ROSENET_DIRECT_PROTOCOL_V1,
} from './constants';
import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants';

import packageJson from '../package.json' with { type: 'json' };

import { RoseNetNodeConfig } from './types';
import sendMessageFactory from './rosenet-direct/sendMessage';

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();
Expand Down Expand Up @@ -145,43 +142,7 @@ const createRoseNetNode = async ({
return {
start: async () => node.start(),
sendMessage: sendMessageFactory(node),
handleIncomingMessage: (
handler: (from: string, message?: string) => void,
) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
},
);
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
},
{ runOnTransientConnection: true },
);
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
},
handleIncomingMessage: handleIncomingMessageFactory(node),
publish: async (topic: string, message: string) => {
node.services.pubsub.publish(topic, textEncoder.encode(message));
},
Expand Down
51 changes: 51 additions & 0 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { pipe } from 'it-pipe';

import { Libp2p } from '@libp2p/interface';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import { decode } from '../utils/codec';

import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants';

/**
* protocol handler for RoseNet direct
*/
const handleIncomingMessageFactory =
(node: Libp2p) => (handler: (from: string, message?: string) => void) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
},
);
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
},
{ runOnTransientConnection: true },
);
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
};

export default handleIncomingMessageFactory;
2 changes: 2 additions & 0 deletions packages/rosenet-node/lib/rosenet-direct/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default as handleIncomingMessageFactory } from './handleIncomingMessage';
export { default as sendMessageFactory } from './sendMessage';

0 comments on commit 7addf22

Please sign in to comment.