diff --git a/CHANGELOG.md b/CHANGELOG.md index dce56e1..13244dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # 变更日志 | Change log +### 0.4.0 + +- 破坏性变更: 使 NamingService 和 ConfigService impl Send + Sync +- 破坏性变更: 默认 async,去掉 sync api,需要的话建议 `futures::executor::block_on(future_fn)` + +--- + +- Change: make NamingService and ConfigService Send + Sync +- Change: all async API; If you need sync, maybe `futures::executor::block_on(future_fn)` + ### 0.3.6 - 文档: 补充说明 `NamingService` 和 `ConfigService` 需要全局的生命周期 diff --git a/Cargo.toml b/Cargo.toml index 73675f4..3bf2fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ config = [] naming = [] tls = ["reqwest/default-tls"] auth-by-http = ["reqwest"] -async = [] [dependencies] nacos-macro = { version = "0.1.0", path = "nacos-macro" } diff --git a/README.md b/README.md index 3169c94..1570dc2 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ Add the dependency in `Cargo.toml`: ```toml [dependencies] -# If you need async API, which can be enabled via `features = ["async"]` -nacos-sdk = { version = "0.3", features = ["default"] } +# If you need sync API, maybe `futures::executor::block_on(future_fn)` +nacos-sdk = { version = "0.4", features = ["default"] } ``` ### Usage of Config @@ -32,7 +32,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } // 因为它内部会初始化与服务端的长链接,后续的数据交互及变更订阅,都是实时地通过长链接告知客户端的。 let config_service = ConfigServiceBuilder::new( ClientProps::new() - .server_addr("0.0.0.0:8848") + .server_addr("127.0.0.1:8848") // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") .app_name("simple_app"), @@ -43,7 +43,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } .build()?; // example get a config - let config_resp = config_service.get_config("todo-data-id".to_string(), "todo-group".to_string()); + let config_resp = config_service.get_config("todo-data-id".to_string(), "todo-group".to_string()).await; match config_resp { Ok(config_resp) => tracing::info!("get the config {}", config_resp), Err(err) => tracing::error!("get the config {:?}", err), @@ -62,7 +62,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } "todo-data-id".to_string(), "todo-group".to_string(), Arc::new(ExampleConfigChangeListener {}), - ); + ).await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -75,7 +75,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } // 因为它内部会初始化与服务端的长链接,后续的数据交互及变更订阅,都是实时地通过长链接告知客户端的。 let naming_service = NamingServiceBuilder::new( ClientProps::new() - .server_addr("0.0.0.0:8848") + .server_addr("127.0.0.1:8848") // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") .app_name("simple_app"), @@ -100,7 +100,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } Some(constants::DEFAULT_GROUP.to_string()), Vec::default(), subscriber, - ); + ).await; // example naming register instances let service_instance1 = ServiceInstance { @@ -112,7 +112,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } "test-service".to_string(), Some(constants::DEFAULT_GROUP.to_string()), vec![service_instance1], - ); + ).await; ``` ### Props diff --git a/examples/simple_app.rs b/examples/simple_app.rs index 4cb88e8..f9b5dfa 100644 --- a/examples/simple_app.rs +++ b/examples/simple_app.rs @@ -7,8 +7,6 @@ use nacos_sdk::api::naming::{ }; use nacos_sdk::api::props::ClientProps; -const NACOS_ADDRESS: &str = "127.0.0.1:8848"; - /// enable https auth run with command: /// cargo run --example simple_app --features default,tls #[tokio::main] @@ -23,7 +21,7 @@ async fn main() -> Result<(), Box> { .init(); let client_props = ClientProps::new() - .server_addr(NACOS_ADDRESS) + .server_addr(constants::DEFAULT_SERVER_ADDR) // .remote_grpc_port(9838) // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") @@ -38,17 +36,21 @@ async fn main() -> Result<(), Box> { let config_service = ConfigServiceBuilder::new(client_props.clone()) .enable_auth_plugin_http() // TODO You can choose not to enable auth .build()?; - let config_resp = config_service.get_config("todo-data-id".to_string(), "LOVE".to_string()); + let config_resp = config_service + .get_config("todo-data-id".to_string(), "LOVE".to_string()) + .await; match config_resp { Ok(config_resp) => tracing::info!("get the config {}", config_resp), Err(err) => tracing::error!("get the config {:?}", err), } - let _listen = config_service.add_listener( - "todo-data-id".to_string(), - "LOVE".to_string(), - std::sync::Arc::new(SimpleConfigChangeListener {}), - ); + let _listen = config_service + .add_listener( + "todo-data-id".to_string(), + "LOVE".to_string(), + std::sync::Arc::new(SimpleConfigChangeListener {}), + ) + .await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -62,31 +64,37 @@ async fn main() -> Result<(), Box> { .build()?; let listener = std::sync::Arc::new(SimpleInstanceChangeListener); - let _subscribe_ret = naming_service.subscribe( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - Vec::default(), - listener, - ); + let _subscribe_ret = naming_service + .subscribe( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + Vec::default(), + listener, + ) + .await; let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, ..Default::default() }; - let _register_instance_ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - vec![service_instance1], - ); + let _register_instance_ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + vec![service_instance1], + ) + .await; tokio::time::sleep(tokio::time::Duration::from_millis(666)).await; - let instances_ret = naming_service.get_all_instances( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - Vec::default(), - false, - ); + let instances_ret = naming_service + .get_all_instances( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + Vec::default(), + false, + ) + .await; match instances_ret { Ok(instances) => tracing::info!("get_all_instances {:?}", instances), Err(err) => tracing::error!("naming get_all_instances error {:?}", err), diff --git a/src/api/config.rs b/src/api/config.rs index da897ed..174de76 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -10,7 +10,7 @@ use crate::api::{error, plugin, props}; /// ```ignore /// let mut config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), @@ -18,74 +18,6 @@ use crate::api::{error, plugin, props}; /// .build()?; /// ``` #[doc(alias("config", "sdk", "api"))] -#[cfg(not(feature = "async"))] -pub trait ConfigService { - /// Get config, return the content. - /// - /// Attention to [`error::Error::ConfigNotFound`], [`error::Error::ConfigQueryConflict`] - fn get_config(&self, data_id: String, group: String) -> error::Result; - - /// Publish config, return true/false. - fn publish_config( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - ) -> error::Result; - - /// Cas publish config with cas_md5 (prev content's md5), return true/false. - fn publish_config_cas( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: String, - ) -> error::Result; - - /// Beta publish config, return true/false. - fn publish_config_beta( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - beta_ips: String, - ) -> error::Result; - - /// Publish config with params (see keys [`constants::*`]), return true/false. - fn publish_config_param( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: Option, - params: HashMap, - ) -> error::Result; - - /// Remove config, return true/false. - fn remove_config(&self, data_id: String, group: String) -> error::Result; - - /// Listen the config change. - fn add_listener( - &self, - data_id: String, - group: String, - listener: Arc, - ) -> error::Result<()>; - - /// Remove a Listener. - fn remove_listener( - &self, - data_id: String, - group: String, - listener: Arc, - ) -> error::Result<()>; -} - -#[cfg(feature = "async")] #[async_trait::async_trait] pub trait ConfigService: Send + Sync { /// Get config, return the content. @@ -256,7 +188,7 @@ pub mod constants { /// ```ignore /// let mut config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), diff --git a/src/api/constants.rs b/src/api/constants.rs index 72d6a51..872a6bf 100644 --- a/src/api/constants.rs +++ b/src/api/constants.rs @@ -1,4 +1,4 @@ -pub const DEFAULT_SERVER_ADDR: &str = "0.0.0.0:8848"; +pub const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:8848"; pub const DEFAULT_SERVER_PORT: u32 = 8848; diff --git a/src/api/naming.rs b/src/api/naming.rs index f13ee0d..9ec89fa 100644 --- a/src/api/naming.rs +++ b/src/api/naming.rs @@ -135,7 +135,7 @@ pub trait NamingEventListener: Send + Sync + 'static { /// ```ignore /// let mut naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), @@ -143,79 +143,6 @@ pub trait NamingEventListener: Send + Sync + 'static { /// .build()?; /// ``` #[doc(alias("naming", "sdk", "api"))] -#[cfg(not(feature = "async"))] -pub trait NamingService { - fn register_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()>; - - fn deregister_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()>; - - fn batch_register_instance( - &self, - service_name: String, - group_name: Option, - service_instances: Vec, - ) -> Result<()>; - - fn get_all_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result>; - - fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result>; - - fn select_one_healthy_instance( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result; - - fn get_service_list( - &self, - page_no: i32, - page_size: i32, - group_name: Option, - ) -> Result<(Vec, i32)>; - - fn subscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()>; - - fn unsubscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()>; -} - -#[cfg(feature = "async")] #[async_trait::async_trait] pub trait NamingService: Send + Sync { async fn register_instance( @@ -295,7 +222,7 @@ pub trait NamingService: Send + Sync { /// ```ignore /// let mut naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), diff --git a/src/api/plugin/auth/auth_by_http.rs b/src/api/plugin/auth/auth_by_http.rs index b53273f..7d3ffcd 100644 --- a/src/api/plugin/auth/auth_by_http.rs +++ b/src/api/plugin/auth/auth_by_http.rs @@ -122,7 +122,7 @@ mod tests { .init(); let http_auth_plugin = HttpLoginAuthPlugin::default(); - let server_list = vec!["0.0.0.0:8848".to_string()]; + let server_list = vec!["127.0.0.1:8848".to_string()]; let auth_context = AuthContext::default() .add_param(crate::api::plugin::USERNAME, "nacos") diff --git a/src/config/mod.rs b/src/config/mod.rs index d7ff80f..0227851 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -50,115 +50,6 @@ impl NacosConfigService { } } -#[cfg(not(feature = "async"))] -impl ConfigService for NacosConfigService { - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn get_config( - &self, - data_id: String, - group: String, - ) -> crate::api::error::Result { - let future = self.client_worker.get_config(data_id, group); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - ) -> crate::api::error::Result { - let future = self - .client_worker - .publish_config(data_id, group, content, content_type); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_cas( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: String, - ) -> crate::api::error::Result { - let future = - self.client_worker - .publish_config_cas(data_id, group, content, content_type, cas_md5); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_beta( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - beta_ips: String, - ) -> crate::api::error::Result { - let future = - self.client_worker - .publish_config_beta(data_id, group, content, content_type, beta_ips); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_param( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: Option, - params: std::collections::HashMap, - ) -> crate::api::error::Result { - let future = self.client_worker.publish_config_param( - data_id, - group, - content, - content_type, - cas_md5, - params, - ); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn remove_config(&self, data_id: String, group: String) -> crate::api::error::Result { - let future = self.client_worker.remove_config(data_id, group); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn add_listener( - &self, - data_id: String, - group: String, - listener: std::sync::Arc, - ) -> crate::api::error::Result<()> { - let future = self.client_worker.add_listener(data_id, group, listener); - futures::executor::block_on(future); - Ok(()) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn remove_listener( - &self, - data_id: String, - group: String, - listener: std::sync::Arc, - ) -> crate::api::error::Result<()> { - let future = self.client_worker.remove_listener(data_id, group, listener); - futures::executor::block_on(future); - Ok(()) - } -} - -#[cfg(feature = "async")] #[async_trait::async_trait] impl ConfigService for NacosConfigService { #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] diff --git a/src/lib.rs b/src/lib.rs index 8876406..80efc80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,10 +25,10 @@ //! ## Add Dependency //! //! Add the dependency in `Cargo.toml`: -//! - If you need async API, which can be enabled via `features = ["async"]` +//! - If you need sync API, maybe `futures::executor::block_on(future_fn)` //! ```toml //! [dependencies] -//! nacos-sdk = { version = "0.3", features = ["default"] } +//! nacos-sdk = { version = "0.4", features = ["default"] } //! ``` //! //! ## General Configurations and Initialization @@ -40,7 +40,7 @@ //! ```ignore //! let config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( //! nacos_sdk::api::props::ClientProps::new() -//! .server_addr("0.0.0.0:8848") +//! .server_addr("127.0.0.1:8848") //! // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. //! .namespace("") //! .app_name("todo-your-app-name"), @@ -53,7 +53,7 @@ //! ```ignore //! let naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( //! nacos_sdk::api::props::ClientProps::new() -//! .server_addr("0.0.0.0:8848") +//! .server_addr("127.0.0.1:8848") //! // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. //! .namespace("") //! .app_name("todo-your-app-name"), diff --git a/src/naming/mod.rs b/src/naming/mod.rs index d0a9274..82f687d 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -643,142 +643,24 @@ impl NacosNamingService { } } -#[cfg(not(feature = "async"))] +#[async_trait::async_trait] impl NamingService for NacosNamingService { #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn deregister_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()> { - if service_instance.ephemeral { - let future = self.deregister_ephemeral_instance_async( - service_name, - group_name, - service_instance, - ); - futures::executor::block_on(future) - } else { - let future = self.deregister_persistent_instance_async( - service_name, - group_name, - service_instance, - ); - futures::executor::block_on(future) - } - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn batch_register_instance( - &self, - service_name: String, - group_name: Option, - service_instances: Vec, - ) -> Result<()> { - let future = - self.batch_register_instance_async(service_name, group_name, service_instances); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn get_all_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result> { - let future = self.get_all_instances_async(service_name, group_name, clusters, subscribe); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn select_one_healthy_instance( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result { - let future = - self.select_one_healthy_instance_async(service_name, group_name, clusters, subscribe); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn get_service_list( - &self, - page_no: i32, - page_size: i32, - group_name: Option, - ) -> Result<(Vec, i32)> { - let future = self.get_service_list_async(page_no, page_size, group_name); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn subscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()> { - let future = self.subscribe_async(service_name, group_name, clusters, Some(event_listener)); - let _ = futures::executor::block_on(future); - Ok(()) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn unsubscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()> { - let future = - self.unsubscribe_async(service_name, group_name, clusters, Some(event_listener)); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn register_instance( + async fn register_instance( &self, service_name: String, group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { if service_instance.ephemeral { - let future = - self.register_ephemeral_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + self.register_ephemeral_instance_async(service_name, group_name, service_instance) + .await } else { - let future = - self.register_persistent_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + self.register_persistent_instance_async(service_name, group_name, service_instance) + .await } } - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result> { - let future = - self.select_instances_async(service_name, group_name, clusters, subscribe, healthy); - futures::executor::block_on(future) - } -} - -#[cfg(feature = "async")] -#[async_trait::async_trait] -impl NamingService for NacosNamingService { #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] async fn deregister_instance( &self, @@ -818,6 +700,19 @@ impl NamingService for NacosNamingService { .await } + #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] + async fn select_instances( + &self, + service_name: String, + group_name: Option, + clusters: Vec, + subscribe: bool, + healthy: bool, + ) -> Result> { + self.select_instances_async(service_name, group_name, clusters, subscribe, healthy) + .await + } + #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] async fn select_one_healthy_instance( &self, @@ -866,35 +761,6 @@ impl NamingService for NacosNamingService { self.unsubscribe_async(service_name, group_name, clusters, Some(event_listener)) .await } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - async fn register_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()> { - if service_instance.ephemeral { - self.register_ephemeral_instance_async(service_name, group_name, service_instance) - .await - } else { - self.register_persistent_instance_async(service_name, group_name, service_instance) - .await - } - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - async fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result> { - self.select_instances_async(service_name, group_name, clusters, subscribe, healthy) - .await - } } #[cfg(test)]