Skip to content

Commit

Permalink
Merge branch '95-pubsub-senderId' into 'dev'
Browse files Browse the repository at this point in the history
Resolve "Add sign/verify to pubsub due to lack of senderID"

Closes #95

See merge request ergo/rosen-bridge/rosenet!59
  • Loading branch information
vorujack committed Nov 19, 2024
2 parents 71d33ff + 25be799 commit 986267b
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .changeset/twelve-hounds-live.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rosen-bridge/rosenet-node': minor
'@rosen-bridge/rosenet-utils': minor
---

Add sign/verify messages in pubub protocol and add from field to subscribe handler
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion packages/rosenet-node/lib/rosenet-pubsub/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead, isBulkheadRejectedError } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';
import { PubSubMSG } from '../types';
import { messageCrypto } from '@rosen-bridge/rosenet-utils';

const textEncoder = new TextEncoder();

Expand All @@ -16,8 +18,20 @@ const publishFactory = (node: Libp2p<{ pubsub: PubSub }>) => {

return async (topic: string, message: string) => {
try {
const signature = await messageCrypto.sign(
node.peerId.privateKey!,
message,
);
const finalMsg: PubSubMSG = {
senderPubKey: Buffer.from(node.peerId.publicKey!).toString('hex'),
signature: signature,
message: message,
};
await bulkheadPolicy.execute(() =>
node.services.pubsub.publish(topic, textEncoder.encode(message)),
node.services.pubsub.publish(
topic,
textEncoder.encode(JSON.stringify(finalMsg)),
),
);
RoseNetNodeContext.logger.debug('Message published successfully');
} catch (error) {
Expand Down
37 changes: 30 additions & 7 deletions packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';
import { PubSubMSG } from '../types';
import { messageCrypto } from '@rosen-bridge/rosenet-utils';
import peerIdFromPublicKey from '@rosen-bridge/rosenet-utils/dist/peerIdFromPublicKey';

const textDecoder = new TextDecoder();

Expand All @@ -14,17 +17,37 @@ const subscribeFactory = (node: Libp2p<{ pubsub: PubSub }>) => {
RoseNetNodeContext.config.pubsub.maxInboundQueueSize,
);

return async (topic: string, handler: (message: string) => void) => {
return async (
topic: string,
handler: (from: string, message: string) => void,
) => {
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', async (event) => {
try {
await bulkheadPolicy.execute(() => {
await bulkheadPolicy.execute(async () => {
if (event.detail.topic === topic) {
const message = textDecoder.decode(event.detail.data);
handler(message);
RoseNetNodeContext.logger.debug('Pubsub message received', {
message,
});
const msgStr = textDecoder.decode(event.detail.data);
const message: PubSubMSG = JSON.parse(msgStr);
if (
await messageCrypto.verify(
message.senderPubKey,
message.message,
message.signature,
)
) {
const from = await peerIdFromPublicKey(message.senderPubKey);
handler(from.toString(), message.message);
RoseNetNodeContext.logger.debug('Pubsub message received', {
message,
});
} else {
RoseNetNodeContext.logger.warn(
`Couldn't verify pubsub message with senderPubKey ${message.senderPubKey}`,
);
RoseNetNodeContext.logger.debug(
`senderPubKey ${message.senderPubKey} couldn't verify message ${message.message} with signature ${message.signature}`,
);
}
}
});
} catch {
Expand Down
6 changes: 6 additions & 0 deletions packages/rosenet-node/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ export type PartialRoseNetNodeConfig = Pick<
RoseNetNodeConfigMandatoryKeys
> &
RecursivePartial<Omit<RoseNetNodeConfig, RoseNetNodeConfigMandatoryKeys>>;

export type PubSubMSG = {
senderPubKey: string;
signature: string;
message: string;
};
1 change: 1 addition & 0 deletions packages/utils/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { default as addEventListeners } from './addEventListeners';
export { default as libp2pLoggerFactory } from './logger';
export { default as privateKeyToPeerId } from './privateKeyToPeerId';
export { default as readPrivateKeyFromFile } from './readPrivateKeyFromFile';
export { default as messageCrypto } from './messageCrypto';
30 changes: 30 additions & 0 deletions packages/utils/lib/messageCrypto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { unmarshalPrivateKey, unmarshalPublicKey } from '@libp2p/crypto/keys';
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';

/**
* sign data using provided private key
* @param privKeyBytes
* @param data
*/
const sign = async (privKeyBytes: Uint8Array, data: string) => {
const privKey = await unmarshalPrivateKey(privKeyBytes);
const signature = await privKey.sign(uint8ArrayFromString(data, 'utf-8'));
return uint8ArrayToString(signature, 'hex');
};

/**
* verify data using publicKey and provided signature
* @param pubKeyBytes
* @param data
* @param signature
*/
const verify = async (pubKeyBytes: string, data: string, signature: string) => {
const pubKey = unmarshalPublicKey(Buffer.from(pubKeyBytes, 'hex'));
return pubKey.verify(
uint8ArrayFromString(data, 'utf-8'),
uint8ArrayFromString(signature, 'hex'),
);
};

export default { sign, verify };
23 changes: 23 additions & 0 deletions packages/utils/lib/peerIdFromPublicKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { unmarshalPublicKey } from '@libp2p/crypto/keys';
import { createFromPubKey } from '@libp2p/peer-id-factory';

/**
* convert a public key string to a PublicKey object
* @param publicKey hex encoded public key in any of rsa, ed25519, or
* secp256k1 formats
*/
const unmarshalPublicKeyString = (publicKey: string) =>
unmarshalPublicKey(Buffer.from(publicKey, 'hex'));

/**
* generate a peer id from a public key
*
* @param publicKey hex encoded public key in any of rsa, ed25519, or
* secp256k1 formats
*/
const peerIdFromPublicKey = async (publicKey: string) => {
const publicKeyObj = unmarshalPublicKeyString(publicKey);
return await createFromPubKey(publicKeyObj);
};

export default peerIdFromPublicKey;
2 changes: 1 addition & 1 deletion tests/global/src/node/registerHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const registerHandlers = (
});
});

node.subscribe('rosenet-news', (message) => {
node.subscribe('rosenet-news', (_, message) => {
const roundtripEnd = Date.now();
const roundtripStart = +message.slice(-13);
const latency = roundtripEnd - roundtripStart;
Expand Down
2 changes: 1 addition & 1 deletion tests/scale/src/node/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ node.handleIncomingMessage(async (from) => {
};
});

node.subscribe('rosenet-news', (message) => {
node.subscribe('rosenet-news', (_, message) => {
received++;
const delay = +(Date.now() - +message.split('.')[1]) / 1000;
totalDelay += +(Date.now() - +message.split('.')[1]) / 1000;
Expand Down

0 comments on commit 986267b

Please sign in to comment.