diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index 54db759..ba832ca 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -8,7 +8,6 @@ import { PeerId } from '@libp2p/interface'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { tcp } from '@libp2p/tcp'; -import { pipe } from 'it-pipe'; import { createLibp2p } from 'libp2p'; import { @@ -17,28 +16,26 @@ import { privateKeyToPeerId, } from '@rosen-bridge/rosenet-utils'; +import { + handleIncomingMessageFactory, + sendMessageFactory, +} from './rosenet-direct'; + import RoseNetNodeContext from './context/RoseNetNodeContext'; import restartRelayDiscovery from './libp2p/restart-relay-discovery'; import addressService from './address/address-service'; -import { decode } from './utils/codec'; import sample from './utils/sample'; import RoseNetNodeError from './errors/RoseNetNodeError'; -import { - ACK_BYTE, - DEFAULT_NODE_PORT, - RELAYS_COUNT_TO_CONNECT, - ROSENET_DIRECT_PROTOCOL_V1, -} from './constants'; +import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants'; import packageJson from '../package.json' with { type: 'json' }; import { RoseNetNodeConfig } from './types'; -import sendMessageFactory from './rosenet-direct/sendMessage'; const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder(); @@ -145,43 +142,7 @@ const createRoseNetNode = async ({ return { start: async () => node.start(), sendMessage: sendMessageFactory(node), - handleIncomingMessage: ( - handler: (from: string, message?: string) => void, - ) => { - node.handle( - ROSENET_DIRECT_PROTOCOL_V1, - async ({ connection, stream }) => { - RoseNetNodeContext.logger.debug( - `incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, - { - remoteAddress: connection.remoteAddr.toString(), - transient: connection.transient, - }, - ); - pipe( - stream, - decode, - async function* (source) { - for await (const message of source) { - RoseNetNodeContext.logger.debug( - 'message received, calling handler and sending ack', - { - message, - }, - ); - handler(connection.remotePeer.toString(), message); - yield Uint8Array.of(ACK_BYTE); - } - }, - stream, - ); - }, - { runOnTransientConnection: true }, - ); - RoseNetNodeContext.logger.debug( - `handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, - ); - }, + handleIncomingMessage: handleIncomingMessageFactory(node), publish: async (topic: string, message: string) => { node.services.pubsub.publish(topic, textEncoder.encode(message)); }, diff --git a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts new file mode 100644 index 0000000..e756e7b --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts @@ -0,0 +1,51 @@ +import { pipe } from 'it-pipe'; + +import { Libp2p } from '@libp2p/interface'; + +import RoseNetNodeContext from '../context/RoseNetNodeContext'; + +import { decode } from '../utils/codec'; + +import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; + +/** + * protocol handler for RoseNet direct + */ +const handleIncomingMessageFactory = + (node: Libp2p) => (handler: (from: string, message?: string) => void) => { + node.handle( + ROSENET_DIRECT_PROTOCOL_V1, + async ({ connection, stream }) => { + RoseNetNodeContext.logger.debug( + `incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, + { + remoteAddress: connection.remoteAddr.toString(), + transient: connection.transient, + }, + ); + pipe( + stream, + decode, + async function* (source) { + for await (const message of source) { + RoseNetNodeContext.logger.debug( + 'message received, calling handler and sending ack', + { + message, + }, + ); + handler(connection.remotePeer.toString(), message); + yield Uint8Array.of(ACK_BYTE); + } + }, + stream, + ); + }, + { runOnTransientConnection: true }, + ); + RoseNetNodeContext.logger.debug( + `handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, + ); + }; + +export default handleIncomingMessageFactory; diff --git a/packages/rosenet-node/lib/rosenet-direct/index.ts b/packages/rosenet-node/lib/rosenet-direct/index.ts new file mode 100644 index 0000000..2880318 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-direct/index.ts @@ -0,0 +1,2 @@ +export { default as handleIncomingMessageFactory } from './handleIncomingMessage'; +export { default as sendMessageFactory } from './sendMessage';