Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mix poc #3284

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,7 @@
url = https://github.com/vacp2p/nim-ngtcp2.git
ignore = untracked
branch = master
[submodule "vendor/mix"]
path = vendor/mix
url = https://github.com/vacp2p/mix/
branch = mix-hybrid
122 changes: 122 additions & 0 deletions examples/lightpush_publisher_mix.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import
std/[tables, times, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
results,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr

import ../vendor/mix/src/entry_connection,
../vendor/mix/src/protocol

import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_core/codecs,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
waku_lightpush/client
]

proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())

# careful if running pub and sub in the same machine
const wakuPort = 60000

const clusterId = 2
const shardId = @[0'u16]

const
LightpushPeer =
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")

proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)

notice "starting publisher", wakuPort = wakuPort

let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)

let relayShards = RelayShards.init(clusterId, shardId).valueOr:
error "Relay shards initialization failed", error = error
quit(QuitFailure)

var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(relayShards).expect(
"Building ENR with relay sharding failed"
)

let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()

var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
let node = builder.build().tryGet()

node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
node.mountLightPushClient()
(
await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")
).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
return

let conn = MixEntryConnection.newConn(
"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
ProtocolType.fromString(WakuLightPushCodec),
node.mix)

await node.start()
node.peerManager.start()

notice "publisher service started"
while true:
let text = "hi there i'm a publisher"
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: LightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now(),
) # current timestamp

let res =
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)

if res.isOk:
notice "published message",
text = text,
timestamp = message.timestamp,
psTopic = LightpushPubsubTopic,
contentTopic = LightpushContentTopic
else:
error "failed to publish message", error = res.error

await sleepAsync(5000)
break

when isMainModule:
let rng = crypto.newRng()
asyncSpawn setupAndPublish(rng)
runForever()
1 change: 1 addition & 0 deletions vendor/mix
Submodule mix added at ba1125
2 changes: 1 addition & 1 deletion vendor/nim-libp2p
1 change: 1 addition & 0 deletions waku.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ task example2, "Build Waku examples":
buildBinary "subscriber", "examples/"
buildBinary "filter_subscriber", "examples/"
buildBinary "lightpush_publisher", "examples/"
buildBinary "lightpush_publisher_mix", "examples/"

task chat2, "Build example Waku chat usage":
# NOTE For debugging, set debug level. For chat usage we want minimal log
Expand Down
10 changes: 10 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,16 @@ with the drawback of consuming some more bandwidth.""",
name: "rendezvous"
.}: bool

#Mix config
mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
Option[string]
#TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs
#[ mixBootstrapNodes* {.
desc:
"Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.",
name: "mix-bootstrap-node"
.}: seq[string] ]#

## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
Expand Down
11 changes: 11 additions & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,17 @@ proc setupProtocols(
return
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)

#mount mix
let mixPrivKey:string =
if conf.mixkey.isSome():
conf.mixkey.get()
else:
error "missing mix key"
return err("missing mix key")
(
await node.mountMix(mixPrivKey)
).isOkOr:
return err("failed to mount waku mix protocol: " & $error)
return ok()

## Start node
Expand Down
81 changes: 80 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import
stew/byteutils,
eth/keys,
nimcrypto,
nimcrypto/utils as ncrutils,
bearssl/rand,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
Expand All @@ -20,7 +22,13 @@ import
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
libp2p/transports/wstransport
libp2p/transports/wstransport,
../../vendor/mix/src/mix_node,
../../vendor/mix/src/mix_protocol,
../../vendor/mix/src/curve25519,
../../vendor/mix/src/protocol


import
../waku_core,
../waku_core/topics/sharding,
Expand Down Expand Up @@ -119,6 +127,8 @@ type
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings
mix*: MixProtocol
mixbootNodes*: Table[PeerId, MixPubInfo]

proc new*(
T: type WakuNode,
Expand Down Expand Up @@ -204,6 +214,75 @@ proc mountSharding*(
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
return ok()

proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
var mixNodes = initTable[PeerId, MixPubInfo]()
# MixNode Multiaddrs and PublicKeys:
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
]
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
]
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
if peerIdRes.isErr:
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
let peerId = peerIdRes.get()
if peerID == exceptPeerID:
continue
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))

mixNodes[peerId] = mixNodePubInfo
info "using mix bootstrap nodes ", bootNodes = mixNodes
return mixNodes


# Mix Protocol
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
info "mixPrivKey", mixPrivKey = mixPrivKey

let mixKey = intoCurve25519Key(ncrutils.fromHex(mixPrivKey))
let mixPubKey = public(mixKey)

let localaddrStr = node.announcedAddresses[0].toString().valueOr:
return err("Failed to convert multiaddress to string.")
info "local addr", localaddr = localaddrStr

let localMixNodeInfo = initMixNodeInfo(
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
node.switch.peerInfo.privateKey.skkey,
)

let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
if protoRes.isErr:
error "Mix protocol initialization failed", err = protoRes.error
return
node.mix = protoRes.value

var sendHandlerFunc = proc(
conn: Connection, proto: ProtocolType
): Future[void] {.async.} =
try:
await callHandler(node.switch, conn, proto) # Call handler on the switch
except CatchableError as e:
error "Error during execution of MixProtocol handler: ", err = e.msg
return
node.mix.setCallback(sendHandlerFunc)

let catchRes = catch:
node.switch.mount(node.mix)
if catchRes.isErr():
return err(catchRes.error.msg)

return ok()

## Waku Sync

proc mountStoreSync*(
Expand Down
20 changes: 19 additions & 1 deletion waku/waku_lightpush/client.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
import libp2p/peerid
import libp2p/peerid, libp2p/stream/connection
import
../waku_core/peers,
../node/peer_manager,
Expand Down Expand Up @@ -107,3 +107,21 @@ proc publishToAny*(
obs.onMessagePublished(pubSubTopic, message)

return ok()


proc publishWithConn*(
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
## This proc is similar to the publish one but in this case
## we use existing connection to publish.

info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex

let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest))
await conn.writeLP(rpc.encode().buffer)

for obs in wl.publishObservers:
obs.onMessagePublished(pubSubTopic, message)

return ok()