Skip to content

Commit

Permalink
Add NoteProcessor in its own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ariard committed Jul 28, 2023
1 parent 3bafdc4 commit f8dfa40
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
9 changes: 1 addition & 8 deletions src/boardmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use bitcoin::secp256k1;
use civkit::anchormanager::AnchorManager;
use civkit::credentialgateway::CredentialGateway;
use civkit::events::ClientEvents;
use civkit::kindprocessor::NoteProcessor;
use civkit::nodesigner::NodeSigner;
use civkit::peerhandler::PeerInfo;

Expand All @@ -38,7 +37,6 @@ pub struct ServiceManager

credential_gateway: Arc<CredentialGateway>,
//TODO: abstract ServiceProcessor, ServiceSigner and AnchorManager in its own Service component ?
pub note_processor: Arc<NoteProcessor>,
node_signer: Arc<NodeSigner>,
anchor_manager: Arc<AnchorManager>,

Expand All @@ -51,13 +49,12 @@ pub struct ServiceManager

impl ServiceManager
{
pub fn new(credential_gateway: Arc<CredentialGateway>, node_signer: Arc<NodeSigner>, anchor_manager: Arc<AnchorManager>, note_processor: Arc<NoteProcessor>, board_events_send: mpsc::UnboundedSender<ClientEvents>, board_peers_send: mpsc::UnboundedSender<PeerInfo>) -> Self {
pub fn new(credential_gateway: Arc<CredentialGateway>, node_signer: Arc<NodeSigner>, anchor_manager: Arc<AnchorManager>, board_events_send: mpsc::UnboundedSender<ClientEvents>, board_peers_send: mpsc::UnboundedSender<PeerInfo>) -> Self {
let secp_ctx = Secp256k1::new();
let pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42;32]).unwrap());
ServiceManager {
genesis_hash: genesis_block(Network::Testnet).header.block_hash(),
credential_gateway,
note_processor,
anchor_manager,
node_signer,
service_events_send: Mutex::new(board_events_send),
Expand All @@ -66,8 +63,4 @@ impl ServiceManager
secp_ctx,
}
}

pub fn note_stats(&self) -> u64 {
self.note_processor.note_stats()
}
}
12 changes: 12 additions & 0 deletions src/kindprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
use std::sync::Mutex;

use tokio::time::{sleep, Duration};

pub struct NoteProcessor {
note_counters: Mutex<u64>,
current_height: u64,
}

impl NoteProcessor {
pub fn new() -> Self {
NoteProcessor {
note_counters: Mutex::new(0),
current_height: 0,
}
}

Expand All @@ -39,4 +43,12 @@ impl NoteProcessor {
}
return notes;
}

pub async fn run(&mut self) {
loop {
sleep(Duration::from_millis(1000)).await;

println!("[CIVKITD] - NOTE: Note Processr running");
}
}
}
12 changes: 9 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ impl BoardCtrl for ServiceManager {

async fn status_handle(&self, request: Request<boardctrl::BoardStatusRequest>) -> Result<Response<boardctrl::BoardStatusReply>, Status> {

let notes = self.note_stats();
//TODO give a mspc communication channel between ServiceManager and NoteProcessor
//let notes = self.note_stats();

let board_status = boardctrl::BoardStatusReply {
offers: notes,
Expand Down Expand Up @@ -276,7 +277,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let credential_gateway = Arc::new(CredentialGateway::new());

// The note or service provider...quite empty for now.
let note_processor = Arc::new(NoteProcessor::new());
let note_processor = NoteProcessor::new();

// The service provider signer...quite empty for now.
let node_signer = Arc::new(NodeSigner::new());
Expand All @@ -288,7 +289,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client_handler = ClientHandler::new(handler_receive, request_receive);

// Main handler of services provision.
let board_manager = ServiceManager::new(credential_gateway, node_signer, anchor_manager, note_processor, board_events_send, board_peer_send);
let board_manager = ServiceManager::new(credential_gateway, node_signer, anchor_manager, board_events_send, board_peer_send);

let addr = format!("[::1]:{}", cli.cli_port).parse()?;

Expand Down Expand Up @@ -319,6 +320,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
onion_box.run().await;
});

// We start the note processor for messages.
tokio::spawn(async move {
note_processor.run().await;
});

// We start the noise gateway for BOLT8 peers.
tokio::spawn(async move {
noise_gateway.run().await;
Expand Down

0 comments on commit f8dfa40

Please sign in to comment.