From 1501a3c82bb5ae764bd372f15de49bc9fa70640e Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Fri, 27 Sep 2024 16:27:27 +0800 Subject: [PATCH 1/3] ttrpc: support fdstore The ttrpc service support fdstore to send the connections and the incomplete requests to systemd file descriptor store and restore them after systemd service restart. Signed-off-by: Abel Feng --- Cargo.toml | 5 +- src/asynchronous/client.rs | 15 ++- src/asynchronous/connection.rs | 74 +++++++++++- src/asynchronous/fdstore.rs | 157 +++++++++++++++++++++++++ src/asynchronous/mod.rs | 2 + src/asynchronous/server.rs | 207 +++++++++++++++++++++++++++++++-- 6 files changed, 443 insertions(+), 17 deletions(-) create mode 100644 src/asynchronous/fdstore.rs diff --git a/Cargo.toml b/Cargo.toml index 04663705..b7fdde29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,12 @@ nix = "0.23.0" log = "0.4" byteorder = "1.3.2" thiserror = "1.0" +uuid = { version = "1.1.2", features = ["v4"] } async-trait = { version = "0.1.31", optional = true } -tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true } +tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time", "fs"], optional = true } futures = { version = "0.3", optional = true } +libsystemd = { version = "0.7.0", optional = true } [target.'cfg(target_os = "linux")'.dependencies] tokio-vsock = { version = "0.3.1", optional = true } @@ -32,6 +34,7 @@ protobuf-codegen = "3.1.0" default = ["sync"] async = ["async-trait", "tokio", "futures", "tokio-vsock"] sync = [] +fdstore = ["libsystemd"] [package.metadata.docs.rs] all-features = true diff --git a/src/asynchronous/client.rs b/src/asynchronous/client.rs index eeaec054..0ae90953 100644 --- a/src/asynchronous/client.rs +++ b/src/asynchronous/client.rs @@ -54,9 +54,16 @@ impl Client { rx: Some(rx), streams: req_map.clone(), }; - - let conn = Connection::new(stream, delegate); - tokio::spawn(async move { conn.run().await }); + #[cfg(not(feature = "fdstore"))] + { + let conn = Connection::new(stream, delegate); + tokio::spawn(async move { conn.run().await }); + } + #[cfg(feature = "fdstore")] + { + let conn = Connection::new(stream, delegate, "".to_string()); + tokio::spawn(async move { conn.run().await }); + } Client { req_tx, @@ -254,7 +261,7 @@ impl ReaderDelegate for ClientReader { async fn exit(&self) {} - async fn handle_msg(&self, msg: GenMessage) { + async fn handle_msg(&self, _id: u64, msg: GenMessage) { let req_map = self.streams.clone(); tokio::spawn(async move { let resp_tx = match msg.header.type_ { diff --git a/src/asynchronous/connection.rs b/src/asynchronous/connection.rs index 7f099bb2..11f3e303 100644 --- a/src/asynchronous/connection.rs +++ b/src/asynchronous/connection.rs @@ -17,6 +17,8 @@ use crate::error::Error; use crate::proto::GenMessage; use super::stream::SendingMessage; +#[cfg(feature = "fdstore")] +use crate::r#async::fdstore::MessageStore; pub trait Builder { type Reader; @@ -37,10 +39,13 @@ pub trait ReaderDelegate { async fn wait_shutdown(&self); async fn disconnect(&self, e: Error, task: &mut task::JoinHandle<()>); async fn exit(&self); - async fn handle_msg(&self, msg: GenMessage); + // handle message with id, the id is only for message store. + async fn handle_msg(&self, id: u64, msg: GenMessage); } pub struct Connection { + #[cfg(feature = "fdstore")] + name: String, reader: ReadHalf, writer_task: task::JoinHandle<()>, reader_delegate: B::Reader, @@ -53,7 +58,7 @@ where B::Reader: ReaderDelegate + Send + Sync + 'static, B::Writer: WriterDelegate + Send + Sync + 'static, { - pub fn new(conn: S, mut builder: B) -> Self { + pub fn new(conn: S, mut builder: B, #[cfg(feature = "fdstore")] name: String) -> Self { let (reader, mut writer) = split(conn); let (reader_delegate, mut writer_delegate) = builder.build(); @@ -73,6 +78,8 @@ where }); Self { + #[cfg(feature = "fdstore")] + name, reader, writer_task, reader_delegate, @@ -81,6 +88,8 @@ where pub async fn run(self) -> std::io::Result<()> { let Connection { + #[cfg(feature = "fdstore")] + name: _, mut reader, mut writer_task, reader_delegate, @@ -91,7 +100,7 @@ where match res { Ok(msg) => { trace!("Got Message {:?}", msg); - reader_delegate.handle_msg(msg).await; + reader_delegate.handle_msg(0, msg).await; } Err(e) => { trace!("Read msg err: {:?}", e); @@ -111,4 +120,63 @@ where Ok(()) } + + #[cfg(feature = "fdstore")] + pub async fn run_with_message_store(self, message_store: MessageStore) -> std::io::Result<()> { + let Connection { + name, + mut reader, + mut writer_task, + reader_delegate, + } = self; + + let messages = message_store.get_messages(&name).await; + // the next message id should be larger than the message id before restart. + let mut id = 0u64; + for m in messages { + if m.id >= id { + id = m.id + 1; + } + // handle the stored request + reader_delegate.handle_msg(m.id, m.message).await; + } + loop { + select! { + res = GenMessage::read_from(&mut reader) => { + match res { + Ok(msg) => { + trace!("Got Message {:?}", msg); + message_store.insert(name.clone(), id, msg.clone()).await; + reader_delegate.handle_msg(id, msg).await; + id += 1; + } + Err(e) => { + trace!("Read msg err: {:?}", e); + reader_delegate.disconnect(e, &mut writer_task).await; + break; + } + } + } + _v = reader_delegate.wait_shutdown() => { + trace!("Receive shutdown."); + break; + } + } + } + reader_delegate.exit().await; + #[cfg(feature = "fdstore")] + if let Err(e) = libsystemd::daemon::notify_with_fds( + false, + &[ + libsystemd::daemon::NotifyState::Fdname(name.to_string()), + libsystemd::daemon::NotifyState::FdstoreRemove, + ], + &[], + ) { + warn!("failed to notify systemd to remove the fd {}: {}", name, e); + } + trace!("Reader task exit."); + + Ok(()) + } } diff --git a/src/asynchronous/fdstore.rs b/src/asynchronous/fdstore.rs new file mode 100644 index 00000000..f81b6404 --- /dev/null +++ b/src/asynchronous/fdstore.rs @@ -0,0 +1,157 @@ +use crate::{error::Result, proto::GenMessage, Error}; +use std::{collections::HashMap, io::ErrorKind, ops::DerefMut, sync::Arc}; +use tokio::{ + fs::File, + io::{AsyncSeekExt, AsyncWriteExt}, + sync::Mutex, +}; + +const SOCK_NAME_LEN: usize = 36; + +#[derive(Clone)] +pub struct MessageStore { + file: Arc>, + cache: Arc>>>, +} + +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct SockMessage { + pub(crate) sock_name: String, + pub(crate) id: u64, + pub(crate) message: GenMessage, +} + +impl SockMessage { + pub async fn write_to(&self, mut writer: impl tokio::io::AsyncWriteExt + Unpin) -> Result<()> { + writer + .write_all(self.sock_name.as_bytes()) + .await + .map_err(|e| Error::Others(e.to_string()))?; + writer + .write_u64(self.id) + .await + .map_err(|e| Error::Others(e.to_string()))?; + self.message.write_to(writer).await?; + Ok(()) + } + + pub async fn read_from(mut reader: impl tokio::io::AsyncReadExt + Unpin) -> Result { + let mut sock_name_buf = vec![0u8; SOCK_NAME_LEN]; + let len = reader.read_exact(&mut sock_name_buf).await.map_err(|e| { + if e.kind() == ErrorKind::UnexpectedEof { + Error::Eof + } else { + Error::Others(format!("failed to read messages from memfd {}", e)) + } + })?; + if len < SOCK_NAME_LEN { + return Err(Error::Others(format!("read {} bytes for socket name", len))); + } + let sock_name = + String::from_utf8(sock_name_buf).map_err(|e| Error::Others(e.to_string()))?; + let id = reader + .read_u64() + .await + .map_err(|e| Error::Others(e.to_string()))?; + let message = GenMessage::read_from(reader).await?; + Ok(Self { + sock_name, + id, + message, + }) + } +} + +impl MessageStore { + pub async fn load(mut f: File) -> Result { + f.seek(std::io::SeekFrom::Start(0)) + .await + .map_err(|e| Error::Others(e.to_string()))?; + let s = Self { + file: Arc::new(Mutex::new(f)), + cache: Arc::new(Mutex::new(HashMap::new())), + }; + let file = s.file.clone(); + let mut f = file.lock().await; + loop { + let a = SockMessage::read_from(f.deref_mut()).await; + match a { + Ok(m) => { + trace!("load a message from {}, with id {}", m.sock_name, m.id); + s.insert_sock_message(m).await; + } + Err(e) => match e { + Error::Eof => { + break; + } + _ => { + return Err(Error::Others(format!( + "failed to read message from memfd in fdstore: {}", + e + ))); + } + }, + } + } + return Ok(s); + } + + pub async fn insert(&self, sock_name: String, id: u64, m: GenMessage) { + assert_eq!(SOCK_NAME_LEN, sock_name.len()); + self.insert_sock_message(SockMessage { + sock_name, + id, + message: m, + }) + .await; + self.dump().await; + } + + pub async fn dump(&self) { + let mut file = self.file.lock().await; + file.set_len(0).await.unwrap_or_default(); + file.rewind().await.unwrap_or_default(); + let cache = self.cache.lock().await; + for v in cache.values() { + for m in v { + m.write_to(file.deref_mut()).await.unwrap_or_default(); + } + } + file.flush().await.unwrap_or_default(); + } + + pub async fn get_messages(&self, key: &str) -> Vec { + let mut res = vec![]; + let cache = self.cache.lock().await; + if let Some(l) = cache.get(key) { + for m in l { + res.push(m.clone()); + } + } + res + } + + pub async fn remove(&self, sock_name: String, id: u64) { + self.remove_sock_message(sock_name, id).await; + self.dump().await; + } + + async fn remove_sock_message(&self, sock_name: String, id: u64) { + let mut cache = self.cache.lock().await; + if let Some(v) = cache.get_mut(&sock_name) { + v.retain(|x| x.id != id); + } + } + + async fn insert_sock_message(&self, m: SockMessage) { + let mut cache = self.cache.lock().await; + let sock_name = m.sock_name.clone(); + if let Some(v) = cache.get_mut(&sock_name) { + v.push(m); + } else { + let mut l = Vec::new(); + l.push(m); + cache.insert(sock_name, l); + } + } +} diff --git a/src/asynchronous/mod.rs b/src/asynchronous/mod.rs index fb6d39cb..0f2ba00f 100644 --- a/src/asynchronous/mod.rs +++ b/src/asynchronous/mod.rs @@ -12,6 +12,8 @@ mod stream; #[doc(hidden)] mod utils; mod connection; +#[cfg(feature = "fdstore")] +mod fdstore; pub mod shutdown; mod unix_incoming; diff --git a/src/asynchronous/server.rs b/src/asynchronous/server.rs index 4e43092f..72fd0531 100644 --- a/src/asynchronous/server.rs +++ b/src/asynchronous/server.rs @@ -46,6 +46,9 @@ use crate::r#async::stream::{ use crate::r#async::utils; use crate::r#async::{MethodHandler, StreamHandler, TtrpcContext}; +#[cfg(feature = "fdstore")] +use crate::r#async::fdstore::MessageStore; + const DEFAULT_CONN_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(5000); const DEFAULT_SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000); @@ -72,6 +75,9 @@ pub struct Server { shutdown: shutdown::Notifier, stop_listen_tx: Option>>, + + #[cfg(feature = "fdstore")] + message_store: Option, } impl Default for Server { @@ -82,6 +88,8 @@ impl Default for Server { domain: None, shutdown: shutdown::with_timeout(DEFAULT_SERVER_SHUTDOWN_TIMEOUT).0, stop_listen_tx: None, + #[cfg(feature = "fdstore")] + message_store: Default::default(), } } } @@ -140,9 +148,12 @@ impl Server { pub async fn start(&mut self) -> Result<()> { let listenfd = self.get_listenfd()?; + debug!("start ttrpc server"); match self.domain.as_ref() { Some(Domain::Unix) => { + #[cfg(feature = "fdstore")] + self.serve_stored_conns().await?; let sys_unix_listener; unsafe { sys_unix_listener = SysUnixListener::from_raw_fd(listenfd); @@ -168,6 +179,87 @@ impl Server { } } + // serve_stored_conns serves the stored connections in fd store of systemd. + // when "systemd" feature enabled, all the established connections + // will send to systemd by calling `notify_with_fds` of libsystemd, with `NotifyState::Fdstore` + // and systemd will keep it unless we remove it, also by calling `notify_with_fds` with `NotifyState::FdstoreRemove` + // The service can receive the fd from fdstore by calling `receive_descriptors_with_names`, everytime it restart. + // we also create a memfd of name "message_store" and send it to fdstore, every received ttrpc request will be stored in it. + // so everytime we restart, we can get the stored connections and unfinished request in the connections, + // we can continue serve the connections so that the ttrpc session will not be interrputed by the service restart. + // but there is a restriction of this: the ttrpc service should be reentrant. + #[cfg(feature = "fdstore")] + async fn serve_stored_conns(&mut self) -> Result<()> { + use std::ffi::CStr; + use std::os::fd::IntoRawFd; + + use libsystemd::daemon::NotifyState; + use nix::sys::memfd::{memfd_create, MemFdCreateFlag}; + use tokio::{fs::File, net::UnixStream}; + + use crate::asynchronous::fdstore::MessageStore; + debug!("serve stored connection from systemd"); + + let fds = libsystemd::activation::receive_descriptors_with_names(false).unwrap_or_default(); + let mut mem_fd = None; + let mut conns = HashMap::new(); + for (fd, name) in fds { + let fd = fd.into_raw_fd(); + debug!("received an fd {} from fd store: {}", name, fd); + if name == "message_store" { + mem_fd = Some(fd); + continue; + } + //println!("got a listen fd: {}", fd); + let conn = unsafe { + UnixStream::from_std(std::os::unix::net::UnixStream::from_raw_fd(fd)) + .map_err(|e| Error::Socket(e.to_string()))? + }; + conns.insert(name.clone(), conn); + libsystemd::daemon::notify_with_fds( + false, + &[NotifyState::Fdname(name), NotifyState::FdstoreRemove], + &[], + ) + .unwrap(); + } + let mem_fd = if let Some(fd) = mem_fd { + fd + } else { + let fd = memfd_create( + CStr::from_bytes_with_nul(b"message_store\0").unwrap(), + MemFdCreateFlag::MFD_CLOEXEC, + )?; + libsystemd::daemon::notify_with_fds( + false, + &[ + NotifyState::Fdname("message_store".to_string()), + NotifyState::Fdstore, + ], + &[fd], + ) + .unwrap(); + fd + }; + let mem_file = unsafe { File::from_raw_fd(mem_fd) }; + let store = MessageStore::load(mem_file).await?; + self.message_store = Some(store); + let shutdown_waiter = self.shutdown.subscribe(); + for (name, v) in conns { + let fd = v.as_raw_fd(); + spawn_connection_handler( + fd, + v, + self.services.clone(), + shutdown_waiter.clone(), + self.message_store.clone(), + name, + ) + .await; + } + Ok(()) + } + async fn do_start(&mut self, mut incoming: I) -> Result<()> where I: Stream> + Unpin + Send + 'static + AsRawFd, @@ -179,7 +271,8 @@ impl Server { let (stop_listen_tx, mut stop_listen_rx) = channel(1); self.stop_listen_tx = Some(stop_listen_tx); - + #[cfg(feature = "fdstore")] + let message_store = self.message_store.clone(); spawn(async move { loop { select! { @@ -189,13 +282,23 @@ impl Server { match conn { Ok(conn) => { let fd = conn.as_raw_fd(); - // spawn a connection handler, would not block - spawn_connection_handler( - fd, - conn, - services.clone(), - shutdown_waiter.clone(), - ).await; + // spawn a connection handler, would not block + #[cfg(feature = "fdstore")] + spawn_connection_handler( + fd, + conn, + services.clone(), + shutdown_waiter.clone(), + message_store.clone(), + uuid::Uuid::new_v4().to_string(), + ).await; + #[cfg(not(feature = "fdstore"))] + spawn_connection_handler( + fd, + conn, + services.clone(), + shutdown_waiter.clone(), + ).await; } Err(e) => { error!("{:?}", e) @@ -256,6 +359,58 @@ impl Server { } } +#[cfg(feature = "fdstore")] +async fn spawn_connection_handler( + fd: RawFd, + conn: C, + services: Arc>, + shutdown_waiter: shutdown::Waiter, + #[cfg(feature = "fdstore")] message_store: Option, + #[cfg(feature = "fdstore")] sock_name: String, +) where + C: AsyncRead + AsyncWrite + AsRawFd + Send + 'static, +{ + let delegate = ServerBuilder { + fd, + services, + streams: Arc::new(Mutex::new(HashMap::new())), + shutdown_waiter, + message_store: message_store.clone(), + sock_name: sock_name.clone(), + }; + let conn = Connection::new(conn, delegate, sock_name.clone()); + let message_store = message_store.clone(); + if let Err(e) = libsystemd::daemon::notify_with_fds( + false, + &[ + libsystemd::daemon::NotifyState::Fdname(sock_name.to_string()), + libsystemd::daemon::NotifyState::Fdstore, + ], + &[fd], + ) { + warn!("failed to notify fds to systemd: {}", e); + } + spawn(async move { + #[cfg(feature = "fdstore")] + if let Some(store) = message_store { + conn.run_with_message_store(store) + .await + .map_err(|e| { + trace!("connection run error. {}", e); + }) + .ok(); + } else { + conn.run() + .await + .map_err(|e| { + trace!("connection run error. {}", e); + }) + .ok(); + } + }); +} + +#[cfg(not(feature = "fdstore"))] async fn spawn_connection_handler( fd: RawFd, conn: C, @@ -271,6 +426,7 @@ async fn spawn_connection_handler( shutdown_waiter, }; let conn = Connection::new(conn, delegate); + spawn(async move { conn.run() .await @@ -298,6 +454,10 @@ struct ServerBuilder { services: Arc>, streams: Arc>>, shutdown_waiter: shutdown::Waiter, + #[cfg(feature = "fdstore")] + message_store: Option, + #[cfg(feature = "fdstore")] + sock_name: String, } impl Builder for ServerBuilder { @@ -317,6 +477,10 @@ impl Builder for ServerBuilder { streams: self.streams.clone(), server_shutdown: self.shutdown_waiter.clone(), handler_shutdown: disconnect_notifier, + #[cfg(feature = "fdstore")] + message_store: self.message_store.clone(), + #[cfg(feature = "fdstore")] + sock_name: self.sock_name.clone(), }, ServerWriter { rx }, ) @@ -343,6 +507,10 @@ struct ServerReader { streams: Arc>>, server_shutdown: shutdown::Waiter, handler_shutdown: shutdown::Notifier, + #[cfg(feature = "fdstore")] + sock_name: String, + #[cfg(feature = "fdstore")] + message_store: Option, } #[async_trait] @@ -368,7 +536,8 @@ impl ReaderDelegate for ServerReader { .ok(); } - async fn handle_msg(&self, msg: GenMessage) { + #[cfg(not(feature = "fdstore"))] + async fn handle_msg(&self, _id: u64, msg: GenMessage) { let handler_shutdown_waiter = self.handler_shutdown.subscribe(); let context = self.context(); let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); @@ -380,6 +549,26 @@ impl ReaderDelegate for ServerReader { }); wait_rx.await.unwrap_or_default(); } + + #[cfg(feature = "fdstore")] + async fn handle_msg(&self, id: u64, msg: GenMessage) { + let handler_shutdown_waiter = self.handler_shutdown.subscribe(); + let context = self.context(); + let message_store = self.message_store.clone(); + let sock_name = self.sock_name.clone(); + let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>(); + spawn(async move { + select! { + _ = context.handle_msg(msg, wait_tx) => { + if let Some(store) = message_store { + store.remove(sock_name, id).await; + } + } + _ = handler_shutdown_waiter.wait_shutdown() => {} + } + }); + wait_rx.await.unwrap_or_default(); + } } impl ServerReader { From 301b205c5a6a13c8d7b76c0523d6f4183e5ff4a5 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Sat, 12 Oct 2024 11:28:13 +0800 Subject: [PATCH 2/3] codegen: fix compiler dependency path error the compile should be in the ../compiler but not ../ttrpc-compiler Signed-off-by: Abel Feng --- ttrpc-codegen/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ttrpc-codegen/Cargo.toml b/ttrpc-codegen/Cargo.toml index 8d98c9c8..4b8a48a9 100644 --- a/ttrpc-codegen/Cargo.toml +++ b/ttrpc-codegen/Cargo.toml @@ -16,4 +16,4 @@ readme = "README.md" protobuf-support = "3.1.0" protobuf = { version = "2.27.1" } protobuf-codegen = "3.1.0" -ttrpc-compiler = { path = "../ttrpc-compiler" } +ttrpc-compiler = { path = "../compiler" } From c55bab892fe9ea37b022ec00733b273d84f690e7 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Thu, 5 Dec 2024 11:55:00 +0800 Subject: [PATCH 3/3] add a flag to indicate if fdstore is enabled Signed-off-by: Abel Feng --- src/asynchronous/server.rs | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/asynchronous/server.rs b/src/asynchronous/server.rs index 72fd0531..dcc25ff2 100644 --- a/src/asynchronous/server.rs +++ b/src/asynchronous/server.rs @@ -78,6 +78,8 @@ pub struct Server { #[cfg(feature = "fdstore")] message_store: Option, + #[cfg(feature = "fdstore")] + fdstore_enabled: bool, } impl Default for Server { @@ -90,6 +92,8 @@ impl Default for Server { stop_listen_tx: None, #[cfg(feature = "fdstore")] message_store: Default::default(), + #[cfg(feature = "fdstore")] + fdstore_enabled: false, } } } @@ -131,6 +135,12 @@ impl Server { Ok(self) } + #[cfg(feature = "fdstore")] + pub fn enable_fdstore(mut self) -> Self { + self.fdstore_enabled = true; + self + } + pub fn register_service(mut self, new: HashMap) -> Server { let services = Arc::get_mut(&mut self.services).unwrap(); services.extend(new); @@ -153,7 +163,10 @@ impl Server { match self.domain.as_ref() { Some(Domain::Unix) => { #[cfg(feature = "fdstore")] - self.serve_stored_conns().await?; + if self.fdstore_enabled { + self.serve_stored_conns().await?; + } + let sys_unix_listener; unsafe { sys_unix_listener = SysUnixListener::from_raw_fd(listenfd); @@ -380,16 +393,20 @@ async fn spawn_connection_handler( }; let conn = Connection::new(conn, delegate, sock_name.clone()); let message_store = message_store.clone(); - if let Err(e) = libsystemd::daemon::notify_with_fds( - false, - &[ - libsystemd::daemon::NotifyState::Fdname(sock_name.to_string()), - libsystemd::daemon::NotifyState::Fdstore, - ], - &[fd], - ) { - warn!("failed to notify fds to systemd: {}", e); + // fdstore is not enable if message store is None + if message_store.is_some() { + if let Err(e) = libsystemd::daemon::notify_with_fds( + false, + &[ + libsystemd::daemon::NotifyState::Fdname(sock_name.to_string()), + libsystemd::daemon::NotifyState::Fdstore, + ], + &[fd], + ) { + warn!("failed to notify fds to systemd: {}", e); + } } + spawn(async move { #[cfg(feature = "fdstore")] if let Some(store) = message_store {