Skip to content

Commit

Permalink
Use runtime specific Instant (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n authored Jan 31, 2025
1 parent 005bf33 commit 7e8fff5
Show file tree
Hide file tree
Showing 83 changed files with 712 additions and 512 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/doc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: doc

on:
push:
branches:
- main
paths-ignore:
- 'README.md'
- 'COPYRIGHT'
- 'LICENSE*'
- '**.md'
- '**.txt'
- 'art'
pull_request:
paths-ignore:
- 'README.md'
- 'COPYRIGHT'
- 'LICENSE*'
- '**.md'
- '**.txt'
- 'art'
workflow_dispatch:

env:
nightly: nightly

jobs:
docs:
name: docs
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Cache cargo build and registry
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
target
key: ubuntu-latest-docs-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
ubuntu-latest-docs-
- uses: actions-rs/toolchain@v1
with:
toolchain: ${{ env.nightly }}
override: true
- name: "doc --lib --all-features"
run: cargo doc --lib --no-deps --all-features
env:
RUSTFLAGS: --cfg docsrs
RUSTDOCFLAGS: --cfg docsrs -Dwarnings
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ rustdoc-args = ["--cfg", "docsrs"]
[workspace.dependencies]
auto_impl = "1"
atomic_refcell = "0.1"
agnostic-lite = { version = "0.4", features = ["time"] }
agnostic = "0.6"
agnostic-lite = { version = "0.5", features = ["time"] }
agnostic = "0.7"
# agnostic-lite = { version = "0.4", features = ["time"], path = "../agnostic/agnostic-lite" }
# agnostic = { version = "0.6", path = "../agnostic/agnostic" }
async-lock = "3"
Expand All @@ -42,7 +42,7 @@ indexmap = "2"
getifs = "0.2"
getrandom = "0.3"
metrics = "0.24"
nodecraft = { version = "0.5", features = [
nodecraft = { version = "0.6", features = [
"transformable",
"async",
"resolver",
Expand All @@ -68,7 +68,7 @@ thiserror = "2"
tracing = "0.1"
viewit = "0.1.5"

memberlist-core = { version = "0.4", path = "core", default-features = false }
memberlist-net = { version = "0.4", path = "transports/net", default-features = false }
memberlist-types = { version = "0.3", path = "types", default-features = false }
memberlist-quic = { version = "0.4", path = "transports/quic", default-features = false }
memberlist-core = { version = "0.5", path = "core", default-features = false }
memberlist-net = { version = "0.5", path = "transports/net", default-features = false }
memberlist-types = { version = "0.4", path = "types", default-features = false }
memberlist-quic = { version = "0.5", path = "transports/quic", default-features = false }
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,22 @@ memberlist is eventually consistent but converges quickly on average. The speed

memberlist is WASM/WASI friendly, all crates can be compiled to `wasm-wasi` and `wasm-unknown-unknown` (need to configure the crate features).

### Design
## Installation

```toml
[dependencies]
memberlist = "0.5"
```

## Protocol

memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, Hashicorp developers extends the protocol in a number of ways:

Several extensions are made to increase propagation speed and convergence rate.
Another set of extensions, that Hashicorp developers call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss).
For details on all of these extensions, please read Hashicorp's paper ["Lifeguard : SWIM-ing with Situational Awareness"](https://arxiv.org/abs/1707.00788), along with the memberlist source.

## Design

Unlike the original Go implementation, Rust's memberlist use highly generic and layered architecture, users can easily implement a component by themselves and plug it to the memberlist. Users can even custom their own `Id` and `Address`.

Expand Down Expand Up @@ -118,21 +133,6 @@ Here are the layers:

CompositeDelegate is a helpful struct to split the `Delegate` into multiple small delegates, so that users do not need to implement full `Delegate` when they only want to custom some methods in the Delegate.

### Protocol

memberlist is based on ["SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol"](http://ieeexplore.ieee.org/document/1028914/). However, Hashicorp developers extends the protocol in a number of ways:

Several extensions are made to increase propagation speed and convergence rate.
Another set of extensions, that Hashicorp developers call Lifeguard, are made to make memberlist more robust in the presence of slow message processing (due to factors such as CPU starvation, and network delay or loss).
For details on all of these extensions, please read Hashicorp's paper ["Lifeguard : SWIM-ing with Situational Awareness"](https://arxiv.org/abs/1707.00788), along with the memberlist source.

## Installation

```toml
[dependencies]
memberlist = "0.3"
```

## Q & A

- ***Does Rust's memberlist implemenetation compatible to Go's memberlist?***
Expand All @@ -146,6 +146,7 @@ memberlist = "0.3"
## Related Projects

- [`agnostic`](https://github.com/al8n/agnostic): helps you to develop runtime agnostic crates
- [`getifs`](https://github.com/al8n/getifs): A bunch of cross platform network tools for fetching interfaces, multicast addresses, local ip addresses, private ip addresses, public ip addresses and etc.
- [`nodecraft`](https://github.com/al8n/nodecraft): crafting seamless node operations for distributed systems, which provides foundational traits for node identification and address resolution.
- [`transformable`](https://github.com/al8n/transformable): transform its representation between structured and byte form.
- [`peekable`](https://github.com/al8n/peekable): peekable reader and async reader
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "memberlist-core"
version = "0.4.0"
version = "0.5.0"
edition.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
8 changes: 4 additions & 4 deletions core/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
collections::HashMap,
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
time::Duration,
};

use agnostic_lite::RuntimeLite;
use agnostic_lite::{time::Instant, RuntimeLite};
use bytes::Bytes;
use futures::{FutureExt, StreamExt};

Expand Down Expand Up @@ -539,7 +539,7 @@ where
ping.sequence_number(),
ack_tx,
None,
Instant::now(),
<T::Runtime as RuntimeLite>::now(),
self.inner.opts.probe_interval,
);

Expand All @@ -566,7 +566,7 @@ where
// Mark the sent time here, which should be after any pre-processing and
// system calls to do the actual send. This probably under-reports a bit,
// but it's the best we can do.
let sent = Instant::now();
let sent = <T::Runtime as RuntimeLite>::now();

// Wait for response or timeout.
futures::select! {
Expand Down
21 changes: 9 additions & 12 deletions core/src/base/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::{
future::Future,
marker::PhantomData,
sync::atomic::Ordering,
time::{Duration, Instant},
};
use std::{future::Future, marker::PhantomData, sync::atomic::Ordering, time::Duration};

use agnostic_lite::time::Instant;
use bytes::Bytes;
use nodecraft::Id;

Expand Down Expand Up @@ -524,7 +520,7 @@ pub async fn memberlist_join_shutdown<T, R>(

m1.shutdown().await.unwrap();

wait_for_condition(|| async {
wait_for_condition::<_, _, R>(|| async {
let num = m2.num_online_members().await;
(num == 1, format!("expected 1 node, got {num}"))
})
Expand Down Expand Up @@ -873,7 +869,7 @@ pub async fn memberlist_send<T, R>(
.await
.unwrap();

wait_for_condition(|| async {
wait_for_condition::<_, _, R>(|| async {
let msgs = m1.delegate().unwrap().node_delegate().get_messages().await;

(
Expand All @@ -883,7 +879,7 @@ pub async fn memberlist_send<T, R>(
})
.await;

wait_for_condition(|| async {
wait_for_condition::<_, _, R>(|| async {
let msgs = m2.delegate().unwrap().node_delegate().get_messages().await;

(
Expand Down Expand Up @@ -1148,20 +1144,21 @@ where
}

/// Util function to wait until the condition reaches.
pub async fn wait_for_condition<'a, Fut, F>(mut f: F)
pub async fn wait_for_condition<'a, Fut, F, R>(mut f: F)
where
F: FnMut() -> Fut,
Fut: Future<Output = (bool, String)> + 'a,
R: RuntimeLite,
{
let start = Instant::now();
let start = R::now();
let mut msg = String::new();
while start.elapsed() < Duration::from_secs(20) {
let (done, msg1) = f().await;
if done {
return;
}
msg = msg1;
std::thread::sleep(Duration::from_secs(5));
R::sleep(Duration::from_secs(5)).await;
}
panic!("timeout waiting for condition {}", msg);
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ pub mod tests {
pub use crate::base::tests::*;
}

/// Add `test` prefix to the predefined unit test fn with a given [`Runtime`]
#[cfg(any(feature = "test", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
#[macro_export]
#[doc(hidden)]
macro_rules! unit_tests {
($runtime:ty => $run:ident($($fn:ident), +$(,)?)) => {
$(
Expand All @@ -94,10 +94,10 @@ pub mod tests {
};
}

/// Add `test` prefix to the predefined unit test fn with a given [`Runtime`]
#[cfg(any(feature = "test", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "test", test))))]
#[macro_export]
#[doc(hidden)]
macro_rules! unit_tests_with_expr {
($run:ident($(
$(#[$outer:meta])*
Expand Down
13 changes: 9 additions & 4 deletions core/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::atomic::Ordering, time::Instant};
use std::sync::atomic::Ordering;

use super::{
base::Memberlist,
Expand Down Expand Up @@ -31,7 +31,7 @@ where
&self,
target: &<T::Resolver as AddressResolver>::ResolvedAddress,
ping: Ping<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
deadline: Instant,
deadline: <T::Runtime as RuntimeLite>::Instant,
) -> Result<bool, Error<T, D>> {
let mut conn: T::Stream = match self
.inner
Expand Down Expand Up @@ -90,7 +90,10 @@ where
let mut conn = self
.inner
.transport
.dial_with_deadline(node.address(), Instant::now() + self.inner.opts.timeout)
.dial_with_deadline(
node.address(),
<T::Runtime as RuntimeLite>::now() + self.inner.opts.timeout,
)
.await
.map_err(Error::transport)?;
tracing::debug!(local_addr = %self.inner.id, peer_addr = %node, "memberlist: initiating push/pull sync");
Expand All @@ -107,7 +110,9 @@ where
// Send our state
self.send_local_state(&mut conn, join).await?;

conn.set_deadline(Some(Instant::now() + self.inner.opts.timeout));
conn.set_deadline(Some(
<T::Runtime as RuntimeLite>::now() + self.inner.opts.timeout,
));

match self
.read_message(node.address(), &mut conn)
Expand Down
6 changes: 3 additions & 3 deletions core/src/network/packet/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
&self,
msg: Message<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
timestamp: Instant,
timestamp: <T::Runtime as RuntimeLite>::Instant,
) {
tracing::trace!(local = %self.advertise_address(), from = %from, packet=?msg, "memberlist.packet: handle packet");

Expand Down Expand Up @@ -118,7 +118,7 @@ where
&self,
msgs: OneOrMore<Message<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
timestamp: Instant,
timestamp: <T::Runtime as RuntimeLite>::Instant,
) {
match msgs.into_either() {
Either::Left([msg]) => self.handle_message(msg, from, timestamp).await,
Expand Down Expand Up @@ -241,7 +241,7 @@ where
});
}

async fn handle_ack(&self, ack: Ack, timestamp: Instant) {
async fn handle_ack(&self, ack: Ack, timestamp: <T::Runtime as RuntimeLite>::Instant) {
self
.inner
.ack_manager
Expand Down
13 changes: 10 additions & 3 deletions core/src/network/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ where
let mut conn = self
.inner
.transport
.dial_with_deadline(addr, Instant::now() + self.inner.opts.timeout)
.dial_with_deadline(
addr,
<T::Runtime as RuntimeLite>::now() + self.inner.opts.timeout,
)
.await
.map_err(Error::transport)?;
self.send_message(&mut conn, Message::UserData(msg)).await?;
Expand All @@ -127,7 +130,9 @@ where
join: bool,
) -> Result<(), Error<T, D>> {
// Setup a deadline
conn.set_deadline(Some(Instant::now() + self.inner.opts.timeout));
conn.set_deadline(Some(
<T::Runtime as RuntimeLite>::now() + self.inner.opts.timeout,
));

// Prepare the local node state
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -242,7 +247,9 @@ where
.increment(1);
}

conn.set_deadline(Some(Instant::now() + self.inner.opts.timeout));
conn.set_deadline(Some(
<T::Runtime as RuntimeLite>::now() + self.inner.opts.timeout,
));

let msg = match self.read_message(&addr, &mut conn).await {
Ok((_read, msg)) => {
Expand Down
Loading

0 comments on commit 7e8fff5

Please sign in to comment.