Skip to content

Commit

Permalink
continued work on replicator subsystem & fix breaking changes from pa…
Browse files Browse the repository at this point in the history
…ckage upgrades
  • Loading branch information
LeonHartley committed Mar 24, 2024
1 parent 7911a1e commit f3e5de0
Show file tree
Hide file tree
Showing 27 changed files with 1,700 additions and 1,381 deletions.
1,730 changes: 787 additions & 943 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions coerce/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "coerce"
description = "Async actor runtime and distributed systems framework"
license = "Apache-2.0"
version = "0.8.12"
version = "0.9.0"
authors = ["Leon Hartley <ljph@outlook.com>"]
edition = "2021"
readme = "README.md"
Expand Down Expand Up @@ -78,10 +78,10 @@ singleton = []

[dependencies]
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7.8", features = ["full"] }
tokio-stream = { version = "0.1.14", optional = true }
tracing = { version = "0.1.37" }
uuid = { version = "1.1.2", features = ["serde", "v4"] }
tokio-util = { version = "0.7.10", features = ["full"] }
tokio-stream = { version = "0.1.15", optional = true }
tracing = { version = "0.1.40" }
uuid = { version = "1.8.0", features = ["serde", "v4"] }
lazy_static = "1.4.0"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"
Expand All @@ -91,23 +91,23 @@ hashring = { version = "0.3.0", optional = true }
bytes = { version = "1.4.0", optional = true }
byteorder = { version = "1.4.3", optional = true }
chrono = { version = "0.4", features = ["serde"], optional = true }
protobuf = { version = "=3.2.0", optional = true }
protobuf = { version = "=3.4.0", optional = true }
anyhow = { version = "1.0.71", optional = true }
rand = "0.8.5"
parking_lot = { version = "0.12.1", optional = true }
metrics = { version = "0.21.0", optional = true }
metrics = { version = "0.22.3", optional = true }
valuable = { version = "0.1", features = ["derive"] }
metrics-exporter-prometheus = { version = "0.12.1", optional = true }
metrics-util = { version = "0.15.0", optional = true }
metrics-exporter-prometheus = { version = "0.14.0", optional = true }
metrics-util = { version = "0.16.3", optional = true }
jwt = { version = "0.16.0", optional = true }
hmac = { version = "0.12.1", optional = true }
sha2 = { version = "0.10.6", optional = true }
itertools = { version = "0.12.1", optional = true }

# API dependencies
axum = { version = "0.6.18", features = ["query"], optional = true }
utoipa = { version = "3", features = ["axum_extras", "chrono"], optional = true }
utoipa-swagger-ui = { version = "3", features = ["axum"], optional = true }
axum = { version = "0.7.4", features = ["query"], optional = true }
utoipa = { version = "4.2.0", features = ["axum_extras", "chrono"], optional = true }
utoipa-swagger-ui = { version = "6.0", features = ["axum"], optional = true }

[dev-dependencies]
coerce-macros = { version = "0.2.0" }
Expand Down
25 changes: 12 additions & 13 deletions coerce/src/actor/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@ impl ActorMetrics {
#[inline]
pub fn incr_actor_created(actor_type: &'static str) {
#[cfg(feature = "metrics")]
increment_counter!(METRIC_ACTOR_CREATED,
LABEL_ACTOR_TYPE => actor_type,
);
counter!(METRIC_ACTOR_CREATED, LABEL_ACTOR_TYPE => actor_type).increment(1);
}

#[inline]
pub fn incr_actor_stopped(actor_type: &'static str) {
#[cfg(feature = "metrics")]
increment_counter!(METRIC_ACTOR_STOPPED,
LABEL_ACTOR_TYPE => actor_type,
);
counter!(METRIC_ACTOR_STOPPED, LABEL_ACTOR_TYPE => actor_type).increment(1)
}

#[inline]
pub fn incr_messages_sent(actor_type: &'static str, msg_type: &'static str) {
#[cfg(feature = "metrics")]
increment_counter!(METRIC_ACTOR_MESSAGES_SENT_TOTAL,
counter!(
METRIC_ACTOR_MESSAGES_SENT_TOTAL,
LABEL_ACTOR_TYPE => actor_type,
LABEL_MESSAGE_TYPE => msg_type
);
)
.increment(1)
}

#[inline]
Expand All @@ -48,20 +46,21 @@ impl ActorMetrics {
) {
#[cfg(feature = "metrics")]
{
increment_counter!(METRIC_ACTOR_MESSAGES_PROCESSED_TOTAL,
counter!(METRIC_ACTOR_MESSAGES_PROCESSED_TOTAL,
LABEL_ACTOR_TYPE => actor_type,
LABEL_MESSAGE_TYPE => msg_type
);
)
.increment(1);

histogram!(METRIC_ACTOR_MESSAGE_WAIT_TIME,
wait_time,
LABEL_ACTOR_TYPE => actor_type,
LABEL_MESSAGE_TYPE => msg_type);
LABEL_MESSAGE_TYPE => msg_type)
.record(wait_time);

histogram!(METRIC_ACTOR_MESSAGE_PROCESSING_TIME,
processing_time,
LABEL_ACTOR_TYPE => actor_type,
LABEL_MESSAGE_TYPE => msg_type)
.record(processing_time);
}
}
}
5 changes: 1 addition & 4 deletions coerce/src/actor/scheduler/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time;
use tokio::time::Instant;
use uuid::Uuid;

pub trait TimerTick: Message {}
pub trait TimerTick: Message + Clone {}

enum TimerMode {
Notify,
Expand All @@ -28,7 +28,6 @@ impl Timer {
) -> Timer
where
A: 'static + Handler<T> + Sync + Send,
T: 'static + Clone + Sync + Send,
T::Result: 'static + Sync + Send,
{
let (stop, stop_rx) = oneshot::channel();
Expand All @@ -40,7 +39,6 @@ impl Timer {
pub fn start<A: Actor, T: TimerTick>(actor: LocalActorRef<A>, tick: Duration, msg: T) -> Timer
where
A: 'static + Handler<T> + Sync + Send,
T: 'static + Clone + Sync + Send,
T::Result: 'static + Sync + Send,
{
let (stop, stop_rx) = oneshot::channel();
Expand Down Expand Up @@ -74,7 +72,6 @@ async fn timer_loop<A: Actor, T: TimerTick>(
mode: TimerMode,
) where
A: Handler<T>,
T: 'static + Clone + Sync + Send,
{
let start = if tick_immediately {
Instant::now()
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/persistent/journal/proto/journal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 3.2.0. Do not edit
// This file is generated by rust-protobuf 3.4.0. Do not edit
// .proto file is parsed by protoc 3.13.0
// @generated

Expand All @@ -23,10 +23,10 @@
/// Generated files are compatible only with the same version
/// of protobuf runtime.
const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_3_2_0;
const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_3_4_0;

#[derive(PartialEq,Clone,Default,Debug)]
// @@protoc_insertion_point(message:coerce.persistent.journal.JournalEntry)
#[derive(PartialEq,Clone,Default,Debug)]
pub struct JournalEntry {
// message fields
// @@protoc_insertion_point(field:coerce.persistent.journal.JournalEntry.sequence)
Expand Down
4 changes: 2 additions & 2 deletions coerce/src/remote/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ impl Actor for RemoteHttpApi {
app = route.routes(app);
}

axum::Server::bind(&listen_addr)
.serve(app.into_make_service())
let listener = tokio::net::TcpListener::bind(&listen_addr).await.unwrap();
axum::serve(listener, app)
.with_graceful_shutdown(async { stop_rx.await.unwrap() })
.await
.unwrap()
Expand Down
7 changes: 2 additions & 5 deletions coerce/src/remote/net/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::remote::net::proto::network::{
SessionHandshake, StreamPublishEvent,
};
use crate::remote::net::{proto, StreamData};
use chrono::{DateTime, NaiveDateTime, Utc};
use chrono::{DateTime, Utc};
use protobuf::{Enum, Error, Message};
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
Expand Down Expand Up @@ -179,10 +179,7 @@ pub fn datetime_to_timestamp(
pub fn timestamp_to_datetime(
timestamp: protobuf::well_known_types::timestamp::Timestamp,
) -> DateTime<Utc> {
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp_opt(timestamp.seconds, timestamp.nanos as u32).unwrap(),
Utc,
)
DateTime::<Utc>::from_timestamp(timestamp.seconds, timestamp.nanos as u32).unwrap()
}

impl From<ActorRefErr> for proto::network::ActorRefErr {
Expand Down
12 changes: 5 additions & 7 deletions coerce/src/remote/net/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ impl NetworkMetrics {
#[inline]
pub fn incr_bytes_received(len: u64, src_addr: &str) {
#[cfg(feature = "metrics")]
counter!(
METRIC_NETWORK_BYTES_RECV,
len,
LABEL_SRC_ADDR => src_addr.to_owned()
);
counter!(METRIC_NETWORK_BYTES_RECV,
LABEL_SRC_ADDR => src_addr.to_owned())
.increment(len)
}

#[inline]
pub fn incr_bytes_sent(len: u64, dest_addr: &str) {
#[cfg(feature = "metrics")]
counter!(
METRIC_NETWORK_BYTES_SENT,
len,
LABEL_DEST_ADDR => dest_addr.to_owned()
);
)
.increment(len);
}
}
Loading

0 comments on commit f3e5de0

Please sign in to comment.