Skip to content

Commit

Permalink
Merge pull request #30 from ariard/2023-07-impr-note-processor
Browse files Browse the repository at this point in the history
Move`NoteProcessor` in its own thread
  • Loading branch information
ariard authored Aug 2, 2023
2 parents 2e5a97d + 7b4cbf7 commit a0c78d6
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 48 deletions.
13 changes: 5 additions & 8 deletions src/boardmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use bitcoin::secp256k1::{SecretKey, PublicKey};
use bitcoin::secp256k1::Secp256k1;
use bitcoin::secp256k1;

use civkit::nostr_db::DbRequest;
use civkit::anchormanager::AnchorManager;
use civkit::credentialgateway::CredentialGateway;
use civkit::events::ClientEvents;
use civkit::kindprocessor::NoteProcessor;
use civkit::nodesigner::NodeSigner;
use civkit::peerhandler::PeerInfo;

Expand All @@ -38,36 +38,33 @@ pub struct ServiceManager

credential_gateway: Arc<CredentialGateway>,
//TODO: abstract ServiceProcessor, ServiceSigner and AnchorManager in its own Service component ?
pub note_processor: Arc<NoteProcessor>,
node_signer: Arc<NodeSigner>,
anchor_manager: Arc<AnchorManager>,

pub service_events_send: Mutex<mpsc::UnboundedSender<ClientEvents>>,
pub service_peers_send: Mutex<mpsc::UnboundedSender<PeerInfo>>,

pub send_db_request: Mutex<mpsc::UnboundedSender<DbRequest>>,

our_service_pubkey: PublicKey,
secp_ctx: Secp256k1<secp256k1::All>,
}

impl ServiceManager
{
pub fn new(credential_gateway: Arc<CredentialGateway>, node_signer: Arc<NodeSigner>, anchor_manager: Arc<AnchorManager>, note_processor: Arc<NoteProcessor>, board_events_send: mpsc::UnboundedSender<ClientEvents>, board_peers_send: mpsc::UnboundedSender<PeerInfo>) -> Self {
pub fn new(credential_gateway: Arc<CredentialGateway>, node_signer: Arc<NodeSigner>, anchor_manager: Arc<AnchorManager>, board_events_send: mpsc::UnboundedSender<ClientEvents>, board_peers_send: mpsc::UnboundedSender<PeerInfo>, send_db_request: mpsc::UnboundedSender<DbRequest>) -> Self {
let secp_ctx = Secp256k1::new();
let pubkey = PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42;32]).unwrap());
ServiceManager {
genesis_hash: genesis_block(Network::Testnet).header.block_hash(),
credential_gateway,
note_processor,
anchor_manager,
node_signer,
service_events_send: Mutex::new(board_events_send),
service_peers_send: Mutex::new(board_peers_send),
send_db_request: Mutex::new(send_db_request),
our_service_pubkey: pubkey,
secp_ctx,
}
}

pub fn note_stats(&self) -> u64 {
self.note_processor.note_stats()
}
}
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// licenses.

use boardctrl::board_ctrl_client::BoardCtrlClient;
use boardctrl::{PingRequest, PongRequest, ShutdownRequest, ShutdownReply, SendNote, ReceivedNote, ListClientRequest, ListSubscriptionRequest, PeerConnectionRequest, DisconnectClientRequest, SendNotice, SendOffer, SendInvoice, ListDbEntriesRequest};
use boardctrl::{PingRequest, PongRequest, ShutdownRequest, ShutdownReply, SendNote, ReceivedNote, ListClientRequest, ListSubscriptionRequest, PeerConnectionRequest, DisconnectClientRequest, SendNotice, SendOffer, SendInvoice, ListDbEventsRequest};

use std::env;
use std::process;
Expand Down Expand Up @@ -74,7 +74,7 @@ enum Command {
invoice: String,
},
/// List DB entries
ListDbEntries
ListDbEvents
}

#[tokio::main]
Expand Down Expand Up @@ -189,10 +189,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let response = client.publish_invoice(request).await?;
}
Command::ListDbEntries => {
let request = tonic::Request::new(ListDbEntriesRequest {});
Command::ListDbEvents => {
let request = tonic::Request::new(ListDbEventsRequest {});

let _response = client.list_db_entries(request).await?;
let _response = client.list_db_events(request).await?;
}
}
Ok(())
Expand Down
30 changes: 23 additions & 7 deletions src/clienthandler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use nostr::key::XOnlyPublicKey;

use crate::{events, NostrSub};
use crate::events::{ClientEvents, EventsProvider, ServerCmd};
use crate::nostr_db::{log_new_event_db, log_new_subscription_db};
use crate::nostr_db::DbRequest;

use futures_util::{future, pin_mut, TryStreamExt, StreamExt, SinkExt};

Expand Down Expand Up @@ -102,6 +102,8 @@ pub struct ClientHandler {
handler_receive: Mutex<mpsc::UnboundedReceiver<ClientEvents>>,
connection_receive: Mutex<mpsc::UnboundedReceiver<(TcpStream, SocketAddr)>>,

send_db_requests: Mutex<mpsc::UnboundedSender<DbRequest>>,

filtered_events: HashMap<SubscriptionId, Event>,

pending_events: Mutex<Vec<ClientEvents>>
Expand Down Expand Up @@ -149,7 +151,7 @@ async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, outgoing_rec
}

impl ClientHandler {
pub fn new(handler_receive: mpsc::UnboundedReceiver<ClientEvents>, connection_receive: mpsc::UnboundedReceiver<(TcpStream, SocketAddr)>) -> Self {
pub fn new(handler_receive: mpsc::UnboundedReceiver<ClientEvents>, connection_receive: mpsc::UnboundedReceiver<(TcpStream, SocketAddr)>, send_db_requests: mpsc::UnboundedSender<DbRequest>) -> Self {

let (outgoing_receive, incoming_receive) = mpsc::unbounded_channel::<Vec<u8>>();

Expand All @@ -166,6 +168,8 @@ impl ClientHandler {
handler_receive: Mutex::new(handler_receive),
connection_receive: Mutex::new(connection_receive),

send_db_requests: Mutex::new(send_db_requests),

filtered_events: HashMap::new(),

pending_events: Mutex::new(vec![]),
Expand Down Expand Up @@ -325,6 +329,7 @@ impl ClientHandler {
}

let mut new_pending_events = Vec::new();
let mut events_write_db = Vec::new();
{
// If we have a new event, we'll fan out according to its types (event, subscription, close)
for (id, msg) in msg_queue {
Expand All @@ -342,7 +347,8 @@ impl ClientHandler {
self.filter_events(*msg).await;
//TODO: we should link our filtering policy to our db storing,
//otherwise this is a severe DoS vector
log_new_event_db(*msg_2);
let db_request = DbRequest::WriteEvent(*msg_2);
events_write_db.push(db_request);
},
ClientMessage::Req { subscription_id, filters } => {
self.subscriptions_counter += 1;
Expand All @@ -358,10 +364,10 @@ impl ClientHandler {
let nostr_sub = NostrSub::new(our_side_id, subscription_id.clone(), filters);
let nostr_sub2 = nostr_sub.clone();
self.subscriptions.insert(our_side_id, nostr_sub);
log_new_subscription_db(nostr_sub2);
//TODO: replay stored events when there is a store
new_pending_events.push(ClientEvents::EndOfStoredEvents { client_id: id, sub_id: subscription_id });
println!("[CIVKITD] - NOSTR: New subscription id {}", our_side_id);
let db_request = DbRequest::DumpEvents;
events_write_db.push(db_request);
},
ClientMessage::Close(subscription_id) => {
//TODO: replace our_side_id by Sha256 of SubscriptionId
Expand All @@ -382,18 +388,26 @@ impl ClientHandler {
}
}

{
for ev in events_write_db {
let mut send_db_requests_lock = self.send_db_requests.lock();
send_db_requests_lock.await.send(ev);
}
}


{
let mut pending_events_lock = self.pending_events.lock();
pending_events_lock.await.append(&mut new_pending_events);
}
}
}

async fn filter_events(&mut self, event: Event) {
async fn filter_events(&mut self, event: Event) -> bool {

let mut match_result = false;
for (our_side_id, sub) in self.subscriptions.iter() {
let filters = sub.get_filters();
let mut match_result = false;
for filter in filters {
if let Some(ref kinds) = filter.kinds {
for kind in kinds.iter() {
Expand All @@ -419,5 +433,7 @@ impl ClientHandler {
pending_events_lock.await.append(&mut clients_to_dispatch);
}
}

match_result
}
}
50 changes: 49 additions & 1 deletion src/kindprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,33 @@

//! An interface to sanitize and enforce service policy on the received notes.
use crate::nostr_db::DbRequest;
use crate::nostr_db::{write_new_subscription_db, write_new_event_db, print_events_db};

use std::sync::Mutex;

use tokio::sync::mpsc;
use tokio::sync::Mutex as TokioMutex;
use tokio::time::{sleep, Duration};

pub struct NoteProcessor {
note_counters: Mutex<u64>,
current_height: u64,

receive_db_requests: TokioMutex<mpsc::UnboundedReceiver<DbRequest>>,

receive_db_requests_manager: TokioMutex<mpsc::UnboundedReceiver<DbRequest>>,
}

impl NoteProcessor {
pub fn new() -> Self {
pub fn new(receive_db_requests: mpsc::UnboundedReceiver<DbRequest>, receive_db_requests_manager: mpsc::UnboundedReceiver<DbRequest>) -> Self {
NoteProcessor {
note_counters: Mutex::new(0),
current_height: 0,

receive_db_requests: TokioMutex::new(receive_db_requests),

receive_db_requests_manager: TokioMutex::new(receive_db_requests_manager),
}
}

Expand All @@ -39,4 +56,35 @@ impl NoteProcessor {
}
return notes;
}

pub async fn run(&mut self) {
loop {
sleep(Duration::from_millis(1000)).await;

{
let mut receive_db_requests_lock = self.receive_db_requests.lock();
if let Ok(db_request) = receive_db_requests_lock.await.try_recv() {
match db_request {
DbRequest::WriteEvent(ev) => { write_new_event_db(ev).await; },
DbRequest::WriteSub(ns) => { write_new_subscription_db(ns); },
_ => {},
}
println!("[CIVKITD] - NOTE PROCESSING: Note processor received DB requests");
}
}

{
let mut receive_db_requests_manager_lock = self.receive_db_requests_manager.lock();
if let Ok(db_request) = receive_db_requests_manager_lock.await.try_recv() {
match db_request {
DbRequest::DumpEvents => { print_events_db().await; },
_ => {},
}
println!("[CIVKITD] - NOTE PROCESSING: Note processor received DB requests from ServiceManager");
}
}

//TODO: receive requests from server command
}
}
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bitcoin::secp256k1::PublicKey;

use nostr::{SubscriptionId, Filter};

#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct NostrSub {
our_side_id: u64,
id: SubscriptionId,
Expand All @@ -36,7 +36,7 @@ impl NostrSub {
}
}

#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct NostrPeer {
peer_pubkey: PublicKey,
}
Expand Down
76 changes: 61 additions & 15 deletions src/nostr_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,93 @@ use nostr::Event;

use crate::{NostrSub, NostrPeer};

use rusqlite::{Connection, params};
use rusqlite::{Connection, OpenFlags, params};

use std::path::Path;

const CIVKITD_DB_FILE: &str = "civkitd.db";

#[derive(Debug)]
pub enum DbRequest {
WriteEvent(Event),
WriteSub(NostrSub),
DumpEvents,
}

#[derive(Debug)]
struct DbEvent {
id: i32,
data: Option<Vec<u8>>,
}

#[derive(Debug)]
struct DbSub {
sub_id: i32,
data: Option<Vec<u8>>,
}

pub async fn log_new_event_db(event: Event) {
pub async fn write_new_event_db(event: Event) {

if let Ok(conn) = Connection::open_in_memory() {
conn.execute("CREATE TABLE event (
//TODO: spawn new thread
if let Ok(mut conn) = Connection::open_with_flags(
Path::new(CIVKITD_DB_FILE),
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
) {
println!("[CIVKITD] - NOTE PROCESSING: Opening database for read / write new event");

match conn.execute("CREATE TABLE event (
event_id INTEGER PRIMARY KEY,
data BLOB
)",
());
()) {
Ok(create) => println!("[CIVKITD] - NOTE PROCESSING: {} rows were updated", create),
Err(err) => println!("[CIVKITD] - NOTE PROCESSING: table creation failed: {}", err),
}

//TODO: add complete event
let event = DbEvent {
id: 0,
data: None,
};

conn.execute(
"INSERT INTO event (data) VALUES (:data)",
match conn.execute("INSERT INTO event (data) VALUES (:data)",
&[(&event.data)],
);
}
) {
Ok(update) => println!("[CIVKITD] - NOTE PROCESSING: {} rows were updated", update),
Err(err) => println!("[CIVKITD] - NOTE PROCESSING: update insert failed: {}", err),
}

conn.close().ok();
} else { println!("Failure to open database"); }
}

pub async fn log_new_subscription_db(subscription: NostrSub) {
pub async fn print_events_db() {

if let Ok(mut conn) = Connection::open_with_flags(
Path::new(CIVKITD_DB_FILE),
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
) {
println!("[CIVKITD] - NOTE PROCESSING: Opening database for read events");

{
let mut stmt = conn.prepare("SELECT event_id, data FROM event").unwrap();
let event_iter = stmt.query_map([], |row| {
Ok(DbEvent {
id: row.get(0)?,
data: row.get(1)?,
})
}).unwrap();

for event in event_iter {
println!("[CIVKITD] - NOTE PROCESSING: Found event {:?}", event.unwrap());
}
}

conn.close().ok();
} else { println!("Failure to open database"); }
}

pub async fn write_new_subscription_db(subscription: NostrSub) {

if let Ok(conn) = Connection::open_in_memory() {
conn.execute("CREATE TABLE event (
Expand Down Expand Up @@ -89,8 +139,4 @@ pub async fn log_new_peer_db(peer: NostrPeer) {
}
}

//pub async fn log_new_nostr_client(nostr_client: NostrClient) {
//
//}

//TODO log clients and peers
//TODO: log function for client
Loading

0 comments on commit a0c78d6

Please sign in to comment.