From b8338df9a9bbc33c22a5592365cd14e79b6b3cac Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Sun, 6 Oct 2024 11:04:19 +0330 Subject: [PATCH] refactor(rosenet-node): improve logs --- .changeset/empty-cats-report.md | 2 ++ .../lib/address/address-service.ts | 3 +++ .../rosenet-node/lib/createRoseNetNode.ts | 2 +- .../rosenet-direct/handleIncomingMessage.ts | 22 +++++++++------- .../lib/rosenet-direct/sendMessage.ts | 26 ++++++++++++++----- .../lib/rosenet-pubsub/publish.ts | 5 ++-- .../lib/rosenet-pubsub/subscribe.ts | 7 ++++- 7 files changed, 48 insertions(+), 19 deletions(-) create mode 100644 .changeset/empty-cats-report.md diff --git a/.changeset/empty-cats-report.md b/.changeset/empty-cats-report.md new file mode 100644 index 0000000..a845151 --- /dev/null +++ b/.changeset/empty-cats-report.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/packages/rosenet-node/lib/address/address-service.ts b/packages/rosenet-node/lib/address/address-service.ts index 0fdc47c..86db219 100644 --- a/packages/rosenet-node/lib/address/address-service.ts +++ b/packages/rosenet-node/lib/address/address-service.ts @@ -3,6 +3,8 @@ import { isIP } from 'node:net'; import { fromNodeAddress } from '@multiformats/multiaddr'; import { publicIp } from 'public-ip'; +import RoseNetNodeContext from '../context/RoseNetNodeContext'; + /** * identify public ip (v4 or v6) of current node */ @@ -14,6 +16,7 @@ const identifyPublicIP = () => publicIp(); */ const getAnnounceMultiaddr = async (port: number) => { const ip = await identifyPublicIP(); + RoseNetNodeContext.logger.debug(`Public ip identified: ${ip}`); const ipVersion = isIP(ip); const multiaddr = fromNodeAddress( diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index 1dcbe05..038fc51 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -137,7 +137,7 @@ const createRoseNetNode = async ({ }, logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []), }); - RoseNetNodeContext.logger.debug('RoseNet node created'); + RoseNetNodeContext.logger.info('RoseNet node created'); addEventListeners(node, RoseNetNodeContext.logger); diff --git a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts index 1061f96..d48ef62 100644 --- a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts @@ -51,7 +51,7 @@ const handleIncomingMessageFactory = ROSENET_DIRECT_PROTOCOL_V1, async ({ connection, stream }) => { RoseNetNodeContext.logger.debug( - `incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, + `Incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, { remoteAddress: connection.remoteAddr.toString(), transient: connection.transient, @@ -74,19 +74,23 @@ const handleIncomingMessageFactory = decode, async function* (source) { for await (const message of source) { - RoseNetNodeContext.logger.debug( - 'message received, calling handler and sending ack', - { - message, - }, - ); + RoseNetNodeContext.logger.debug('Message decoded', { + message, + }); handler(connection.remotePeer.toString(), message); + RoseNetNodeContext.logger.debug('Handler called'); yield Uint8Array.of(ACK_BYTE); + RoseNetNodeContext.logger.debug( + 'Ack sent back to the sender', + ); } }, stream, ), ); + RoseNetNodeContext.logger.debug( + 'Incoming message handling completed', + ); } catch (error) { RoseNetNodeContext.logger.warn( 'An error occurred while handling incoming message', @@ -105,8 +109,8 @@ const handleIncomingMessageFactory = }, { runOnTransientConnection: true }, ); - RoseNetNodeContext.logger.debug( - `handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, + RoseNetNodeContext.logger.info( + `Handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, ); }; diff --git a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts index 7b15e37..db04f9e 100644 --- a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts @@ -56,9 +56,12 @@ const sendMessageFactory = async (source) => first(source), ); + RoseNetNodeContext.logger.debug('Starting message sending process'); const result = await messageRoundTripTimeout.execute(() => messagePipe); + RoseNetNodeContext.logger.debug('Message sending process completed'); if (result?.length !== 1) { + RoseNetNodeContext.logger.debug('Invalid multi-chunk ack received'); throw new RoseNetDirectError( `There are more than one chunk in the ack message`, RoseNetDirectErrorType.InvalidAckChunks, @@ -66,17 +69,22 @@ const sendMessageFactory = } const ack = result?.subarray(); if (ack.length !== 1 || ack[0] !== ACK_BYTE) { + RoseNetNodeContext.logger.debug('Invalid ack byte received'); throw new RoseNetDirectError( `Ack byte is invalid`, RoseNetDirectErrorType.InvalidAckByte, ); } + RoseNetNodeContext.logger.debug('Ack validation compeleted'); - RoseNetNodeContext.logger.debug('message sent successfully', { + RoseNetNodeContext.logger.debug('Message sent successfully', { message, }); } catch (error) { if (isBrokenCircuitError(error)) { + RoseNetNodeContext.logger.debug( + 'Message sending attempt failed due to a broken circuit', + ); /** * We were unable to dial, so `stream` is undefined and we don't need to * abort it @@ -88,6 +96,9 @@ const sendMessageFactory = ); } if (isTaskCancelledError(error)) { + RoseNetNodeContext.logger.debug( + 'Message sending attempt failed due to timeout', + ); const errorToThrow = new RoseNetDirectError( 'Message sending timed out', RoseNetDirectErrorType.Timeout, @@ -95,6 +106,9 @@ const sendMessageFactory = stream?.abort(errorToThrow); throw errorToThrow; } + RoseNetNodeContext.logger.debug( + 'Message sending attempt failed for some reason', + ); if (error instanceof RoseNetNodeError) { stream?.abort(error); throw error; @@ -134,14 +148,14 @@ const sendMessageWithRetryAndBulkheadFactory = }), }); retryPolicy.onFailure((data) => { - RoseNetNodeContext.logger.debug('message sending failed', { + RoseNetNodeContext.logger.debug('Message sending failed', { message, reason: data.reason, }); }); retryPolicy.onRetry((data) => { RoseNetNodeContext.logger.debug( - `retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, + `Retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, { message, }, @@ -154,10 +168,10 @@ const sendMessageWithRetryAndBulkheadFactory = .execute(() => sendMessageInner(to, message)) .then(() => onSettled?.()) .catch(() => { - RoseNetNodeContext.logger.error( - 'message sending failed regardless of 3 retries, dropping message', + RoseNetNodeContext.logger.warn( + 'Message sending failed regardless of 3 retries, dropping message', ); - RoseNetNodeContext.logger.debug('message was: ', { + RoseNetNodeContext.logger.debug('Message was: ', { message, }); onSettled?.(new RoseNetNodeError('Message sending failed')); diff --git a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts index 34da2e9..9defb32 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts @@ -25,11 +25,12 @@ const publishFactory = await bulkheadPolicy.execute(() => node.services.pubsub.publish(topic, textEncoder.encode(message)), ); + RoseNetNodeContext.logger.debug('Message published successfully'); } catch (error) { if (isBulkheadRejectedError(error)) { - RoseNetNodeContext.logger.warn('Maximum publish threshold reached'); + RoseNetNodeContext.logger.debug('Maximum publish threshold reached'); } else { - RoseNetNodeContext.logger.warn('Message publish failed', { + RoseNetNodeContext.logger.debug('Message publish failed', { message, }); } diff --git a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts index e27d4d6..0674599 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts @@ -26,7 +26,11 @@ const subscribeFactory = try { await bulkheadPolicy.execute(() => { if (event.detail.topic === topic) { - handler(textDecoder.decode(event.detail.data)); + const message = textDecoder.decode(event.detail.data); + handler(message); + RoseNetNodeContext.logger.debug('Pubsub message received', { + message, + }); } }); } catch { @@ -35,6 +39,7 @@ const subscribeFactory = ); } }); + RoseNetNodeContext.logger.info(`Topic ${topic} subscribed`); }; export default subscribeFactory;