diff --git a/src/api/plugin/auth/auth_by_http.rs b/src/api/plugin/auth/auth_by_http.rs index d669fda..ad201e2 100644 --- a/src/api/plugin/auth/auth_by_http.rs +++ b/src/api/plugin/auth/auth_by_http.rs @@ -1,3 +1,4 @@ +use rand::Rng; use std::ops::{Add, Deref}; use std::sync::RwLock; use std::time::{Duration, Instant}; @@ -49,8 +50,11 @@ impl AuthPlugin for HttpLoginAuthPlugin { let server_addr = { let mutex = self.server_list.read().unwrap(); - // todo random one - mutex.first().unwrap().to_string() + // random one + mutex + .get(rand::thread_rng().gen_range(0..mutex.len())) + .unwrap() + .to_string() }; // todo support https diff --git a/src/api/props.rs b/src/api/props.rs index 1fe8a71..bed9edf 100644 --- a/src/api/props.rs +++ b/src/api/props.rs @@ -16,6 +16,24 @@ pub struct ClientProps { pub(crate) auth_context: HashMap, } +impl ClientProps { + pub(crate) fn get_server_list(&self) -> crate::api::error::Result> { + let hosts: Vec<&str> = self.server_addr.split(',').collect::>(); + if hosts.is_empty() { + return Err(crate::api::error::Error::WrongServerAddress( + self.server_addr.clone(), + )); + } + + let mut result = vec![]; + for host in hosts { + result.push(host.to_string()); + } + + Ok(result) + } +} + #[allow(clippy::new_without_default)] impl ClientProps { /// Creates a new `ClientConfig`. diff --git a/src/common/remote/grpc/grpc_client.rs b/src/common/remote/grpc/grpc_client.rs index 52c1522..9651ba6 100644 --- a/src/common/remote/grpc/grpc_client.rs +++ b/src/common/remote/grpc/grpc_client.rs @@ -27,7 +27,7 @@ pub(crate) struct GrpcClient { impl GrpcClient { pub(crate) async fn new(address: &str) -> Result { - let address = crate::common::remote::into_grpc_server_addr(address)?; + let address = crate::common::remote::into_grpc_server_addr(address, true)?; let address = address.as_str(); info!("init grpc client: {}", address); let env = Arc::new(Environment::new(2)); diff --git a/src/common/remote/mod.rs b/src/common/remote/mod.rs index 0fd2452..d9d63ca 100644 --- a/src/common/remote/mod.rs +++ b/src/common/remote/mod.rs @@ -1,10 +1,10 @@ pub mod grpc; use crate::api::error::Error::WrongServerAddress; +use crate::api::error::Result; +use rand::prelude::SliceRandom; use std::sync::atomic::{AtomicI64, Ordering}; -use crate::api::error::{Error, Result}; - // odd by client request id. const SEQUENCE_INITIAL_VALUE: i64 = 1; const SEQUENCE_DELTA: i64 = 2; @@ -19,12 +19,18 @@ pub(crate) fn generate_request_id() -> String { } /// make address's port plus 1000 -pub(crate) fn into_grpc_server_addr(address: &str) -> Result { - let hosts = address.split(',').collect::>(); - if hosts.len() == 0 { +#[allow(clippy::get_first)] +pub(crate) fn into_grpc_server_addr(address: &str, shuffle: bool) -> Result { + let mut hosts = address.split(',').collect::>(); + if hosts.is_empty() { return Err(WrongServerAddress(address.into())); } + if shuffle { + // shuffle for grpcio LbPolicy::PickFirst, It is a sequential attempt to link, so reorder to balance the load as much as possible. + hosts.shuffle(&mut rand::thread_rng()); + } + let mut result = vec![]; for host in hosts { let host_port_pair = host.split(':').collect::>(); @@ -49,7 +55,7 @@ pub(crate) fn into_grpc_server_addr(address: &str) -> Result { match result.len() { 0 => Err(WrongServerAddress(address.into())), - 1 => Ok(format!("{}", result.get(0).unwrap())), + 1 => Ok(result.get(0).unwrap().to_string()), _ => Ok(format!("ipv4:{}", result.join(","))), } } @@ -60,7 +66,7 @@ mod tests { #[test] fn test_empty_address() { - match into_grpc_server_addr("") { + match into_grpc_server_addr("", false) { Ok(_) => assert!(false), Err(_) => assert!(true), } @@ -68,7 +74,7 @@ mod tests { #[test] fn test_host_address_without_port() { - match into_grpc_server_addr("127.0.0.1") { + match into_grpc_server_addr("127.0.0.1", false) { Ok(_) => assert!(false), Err(_) => assert!(true), } @@ -76,7 +82,7 @@ mod tests { #[test] fn test_host_addresses_without_one_port() { - match into_grpc_server_addr("127.0.0.1:8848,127.0.0.1") { + match into_grpc_server_addr("127.0.0.1:8848,127.0.0.1", false) { Ok(_) => assert!(false), Err(_) => assert!(true), } @@ -86,7 +92,7 @@ mod tests { fn test_single_host_address() { let addr = "127.0.0.1:8848"; let expected = "127.0.0.1:9848"; - let result = into_grpc_server_addr(addr).unwrap(); + let result = into_grpc_server_addr(addr, false).unwrap(); assert_eq!(expected, result); } @@ -94,7 +100,7 @@ mod tests { fn test_multiple_ipv4_address() { let addr = "127.0.0.1:8848,127.0.0.1:8849,127.0.0.1:8850"; let expected = "ipv4:127.0.0.1:9848,127.0.0.1:9849,127.0.0.1:9850"; - let result = into_grpc_server_addr(addr).unwrap(); + let result = into_grpc_server_addr(addr, false).unwrap(); assert_eq!(expected, result); } } diff --git a/src/config/worker.rs b/src/config/worker.rs index 160249a..fba91d5 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -83,7 +83,7 @@ impl ConfigWorker { notify_change_tx_clone, )); - let server_list = Arc::new(vec![client_props.server_addr.clone()]); + let server_list = Arc::new(client_props.get_server_list()?); let plugin = Arc::clone(&auth_plugin); let auth_context = Arc::new(AuthContext::default().add_params(client_props.auth_context.clone())); diff --git a/src/naming/mod.rs b/src/naming/mod.rs index 37394ab..97df3c5 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -73,6 +73,8 @@ pub(crate) struct NacosNamingService { impl NacosNamingService { pub(crate) fn new(client_props: ClientProps, auth_plugin: Arc) -> Result { + let server_list = Arc::new(client_props.get_server_list()?); + let mut namespace = client_props.namespace; if namespace.is_empty() { namespace = self::constants::DEFAULT_NAMESPACE.to_owned(); @@ -103,7 +105,6 @@ impl NacosNamingService { )) .build()?; - let server_list = Arc::new(vec![client_props.server_addr.clone()]); let plugin = Arc::clone(&auth_plugin); let auth_context = Arc::new(AuthContext::default().add_params(client_props.auth_context.clone()));