Skip to content

Commit

Permalink
新控制台初步完成登录校验失败频率限制,默认支持1小时5次;issue #29
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Dec 20, 2023
1 parent 4303373 commit 1ebb77c
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ futures-util = "0.3.29"
regex = "1"
captcha = "0.0.9"
ratelimiter-rs = "0.1.5"
base64 = "0.21.5"
# base64 = "0.21.5"

[dependencies.uuid]
version = "1.2.1"
Expand Down
2 changes: 2 additions & 0 deletions src/common/appdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::grpc::bistream_manage::BiStreamManage;
use crate::naming::cluster::node_manage::{InnerNodeManage, NodeManage};
use crate::naming::cluster::route::NamingRoute;
use crate::naming::core::NamingActor;
use crate::raft::cache::route::CacheRoute;
use crate::raft::cache::CacheManager;
use crate::raft::cluster::route::ConfigRoute;
use crate::raft::db::route::TableRoute;
Expand All @@ -30,6 +31,7 @@ pub struct AppShareData {
pub naming_node_manage: Arc<NodeManage>,
pub raft_table_manage: Addr<TableManager>,
pub raft_table_route: Arc<TableRoute>,
pub raft_cache_route: Arc<CacheRoute>,
pub factory_data: FactoryData,
pub user_manager: Addr<UserManager>,
pub cache_manager: Addr<CacheManager>,
Expand Down
66 changes: 66 additions & 0 deletions src/common/limiter_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use std::convert::TryFrom;

use ratelimiter_rs::RateLimiter;

pub struct LimiterData {
pub rate_to_ms_conversion: i32,
pub consumed_tokens: i32,
pub last_refill_time: i64,
}

impl LimiterData {
pub fn new(rate_to_ms_conversion: i32, consumed_tokens: i32, last_refill_time: i64) -> Self {
Self {
rate_to_ms_conversion,
consumed_tokens,
last_refill_time,
}
}
pub fn to_rate_limiter(self) -> RateLimiter {
RateLimiter::load(
self.rate_to_ms_conversion,
self.consumed_tokens,
self.last_refill_time,
)
}
}

impl TryFrom<&str> for LimiterData {
type Error = anyhow::Error;

fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut iter = value.split(',');
let a: i32 = if let Some(e) = iter.next() {
e.parse()?
} else {
return Err(anyhow::anyhow!("limiter is unvalid"));
};
let b: i32 = if let Some(e) = iter.next() {
e.parse()?
} else {
return Err(anyhow::anyhow!("limiter is unvalid"));
};
let c: i64 = if let Some(e) = iter.next() {
e.parse()?
} else {
return Err(anyhow::anyhow!("limiter is unvalid"));
};
Ok(Self::new(a, b, c))
}
}

impl ToString for LimiterData {
fn to_string(&self) -> String {
format!(
"{},{},{}",
self.rate_to_ms_conversion, self.consumed_tokens, self.last_refill_time
)
}
}

impl From<RateLimiter> for LimiterData {
fn from(value: RateLimiter) -> Self {
let (a, b, c) = value.get_to_save();
Self::new(a, b, c)
}
}
8 changes: 7 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ pub mod constant;
pub mod cycle_queue;
pub mod delay_notify;
pub mod hash_utils;
pub mod limiter_utils;
pub mod model;
pub mod rusqlite_utils;
pub mod sled_utils;
pub mod string_utils;
pub mod web_utils;
pub mod limiter_utils;

lazy_static! {
// Global app sys config
Expand Down Expand Up @@ -59,6 +59,7 @@ pub struct AppSysConfig {
pub raft_auto_init: bool,
pub raft_join_addr: String,
pub console_login_timeout: i32,
pub console_login_one_hour_limit: u32,
}

impl AppSysConfig {
Expand Down Expand Up @@ -101,6 +102,10 @@ impl AppSysConfig {
.unwrap_or("3600".to_owned())
.parse()
.unwrap_or(3600);
let console_login_one_hour_limit = std::env::var("RNACOS_CONSOLE_LOGIN_ONE_HOUR_LIMIT")
.unwrap_or("5".to_owned())
.parse()
.unwrap_or(8848);
Self {
config_db_dir,
config_db_file,
Expand All @@ -114,6 +119,7 @@ impl AppSysConfig {
raft_auto_init,
raft_join_addr,
console_login_timeout,
console_login_one_hour_limit,
}
}

Expand Down
24 changes: 23 additions & 1 deletion src/console/login_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
},
raft::cache::{
model::{CacheKey, CacheType, CacheValue},
CacheManagerReq, CacheManagerResult,
CacheLimiterReq, CacheManagerReq, CacheManagerResult,
},
user::{UserManagerReq, UserManagerResult},
};
Expand Down Expand Up @@ -57,6 +57,24 @@ pub async fn login(
Some("CAPTCHA_CHECK_ERROR".to_owned()),
)));
}
let limit_key = Arc::new(format!("USER_L#{}", &param.username));
let limit_req = CacheLimiterReq::Hour {
key: limit_key.clone(),
limit: 5,
};
//登录前先判断是否登陆准入
if let Ok(CacheManagerResult::Limiter(acquire_result)) =
app.raft_cache_route.request_limiter(limit_req).await
{
if !acquire_result {
return Ok(HttpResponse::Ok().json(ApiResult::<()>::error(
"LOGIN_LIMITE_ERROR".to_owned(),
Some("Frequent login, please try again later".to_owned()),
)));
}
} else {
return Ok(HttpResponse::Ok().json(ApiResult::<()>::error("SYSTEM_ERROR".to_owned(), None)));
}
let msg = UserManagerReq::CheckUser {
name: param.username,
password: param.password,
Expand All @@ -82,6 +100,10 @@ pub async fn login(
ttl: app.sys_config.console_login_timeout,
};
app.cache_manager.do_send(cache_req);
//登录成功后清除登陆限流计数
let clear_limit_req =
CacheManagerReq::Remove(CacheKey::new(CacheType::String, limit_key));
app.cache_manager.do_send(clear_limit_req);
return Ok(HttpResponse::Ok()
.cookie(
Cookie::build("token", token.as_str())
Expand Down
79 changes: 51 additions & 28 deletions src/raft/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use actix::prelude::*;
use bean_factory::{bean, Inject};
use inner_mem_cache::MemCache;
use ratelimiter_rs::RateLimiter;
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};

use crate::{now_second_i32, common::limiter_utils::LimiterData};
use crate::{common::limiter_utils::LimiterData, now_millis_i64, now_second_i32};

use self::model::{CacheItemDo, CacheKey, CacheValue};

Expand Down Expand Up @@ -157,30 +157,31 @@ pub enum CacheManagerReq {
},
}

///只能在主节点执行,才能保证限流的准确性
#[derive(Message, Clone, Debug, Serialize, Deserialize)]
#[rtype(result = "anyhow::Result<CacheManagerResult>")]
pub enum CacheLimiterReq {
Second{
Second {
key: Arc<String>,
limit: i32
limit: i32,
},
Minutes{
Minutes {
key: Arc<String>,
limit: i32
limit: i32,
},
Hour{
Hour {
key: Arc<String>,
limit: i32
limit: i32,
},
Day{
Day {
key: Arc<String>,
limit: i32
limit: i32,
},
OtherMills{
OtherMills {
key: Arc<String>,
limit: i32,
rate_to_ms_conversion: i32,
}
},
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -289,24 +290,46 @@ impl Handler<CacheLimiterReq> for CacheManager {
type Result = anyhow::Result<CacheManagerResult>;

fn handle(&mut self, msg: CacheLimiterReq, _ctx: &mut Self::Context) -> Self::Result {
let raft_table_route = self.raft_table_route.clone();
let (rate_to_ms_conversion,key,limit) = match msg {
CacheLimiterReq::Second { key, limit } => (1000,key,limit),
CacheLimiterReq::Minutes { key, limit } => (60_1000,key,limit),
CacheLimiterReq::Hour { key, limit } => (60*60*1000,key,limit),
CacheLimiterReq::Day { key, limit } => (24*60*60*1000,key,limit),
CacheLimiterReq::OtherMills { key, limit, rate_to_ms_conversion } => (rate_to_ms_conversion,key,limit),
let (rate_to_ms_conversion, key, limit) = match msg {
CacheLimiterReq::Second { key, limit } => (1000, key, limit),
CacheLimiterReq::Minutes { key, limit } => (60_1000, key, limit),
CacheLimiterReq::Hour { key, limit } => (60 * 60 * 1000, key, limit),
CacheLimiterReq::Day { key, limit } => (24 * 60 * 60 * 1000, key, limit),
CacheLimiterReq::OtherMills {
key,
limit,
rate_to_ms_conversion,
} => (rate_to_ms_conversion, key, limit),
};
let key = CacheKey::new(model::CacheType::String,key);
if let Ok(CacheValue::String(v)) = self.cache.get(&key){
let key = CacheKey::new(model::CacheType::String, key);
let now = now_millis_i64();
let mut limiter = if let Ok(CacheValue::String(v)) = self.cache.get(&key) {
let limiter_data: LimiterData = v.as_str().try_into()?;
let mut limiter = RateLimiter::load(limiter_data.0, limiter_data.1, limiter_data.2);
let r = limiter.acquire(limit, limit as i64);
//todo save
}
else{
//todo new init limiter
limiter_data.to_rate_limiter()
} else {
RateLimiter::load(rate_to_ms_conversion, 0, now)
};
let r = limiter.acquire(limit, limit as i64);
if r {
//只有准入才需要计数,才需要保存

let limiter_data: LimiterData = limiter.into();
let cache_value = CacheValue::String(Arc::new(limiter_data.to_string()));
self.cache
.set(key.clone(), cache_value.clone(), rate_to_ms_conversion/1000);

let mut cache_do: CacheItemDo = cache_value.into();
cache_do.timeout = ((now + rate_to_ms_conversion as i64) / 1000) as i32;
if let Some(table_manager) = self.table_manager.as_ref() {
let req: TableManagerReq = TableManagerReq::Set {
table_name: CACHE_TABLE_NAME.clone(),
key: key.to_string().into_bytes(),
value: cache_do.to_bytes(),
last_seq_id: None,
};
table_manager.do_send(req);
}
}
todo!()
Ok(CacheManagerResult::Limiter(r))
}
}
61 changes: 61 additions & 0 deletions src/raft/cache/route.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::sync::Arc;

use crate::{
grpc::PayloadUtils,
raft::{
cluster::{
model::{RouteAddr, RouterRequest, RouterResponse},
route::RaftAddrRouter,
},
network::factory::RaftClusterRequestSender,
},
};

use super::{CacheLimiterReq, CacheManager, CacheManagerResult};
use actix::prelude::*;

pub struct CacheRoute {
cache_manager: Addr<CacheManager>,
raft_addr_route: Arc<RaftAddrRouter>,
cluster_sender: Arc<RaftClusterRequestSender>,
}

impl CacheRoute {
pub fn new(
cache_manager: Addr<CacheManager>,
raft_addr_route: Arc<RaftAddrRouter>,
cluster_sender: Arc<RaftClusterRequestSender>,
) -> Self {
Self {
cache_manager,
raft_addr_route,
cluster_sender,
}
}

fn unknown_err(&self) -> anyhow::Error {
anyhow::anyhow!("unknown the raft leader addr!")
}

pub async fn request_limiter(
&self,
req: CacheLimiterReq,
) -> anyhow::Result<CacheManagerResult> {
match self.raft_addr_route.get_route_addr().await? {
RouteAddr::Local => self.cache_manager.send(req).await?,
RouteAddr::Remote(_, addr) => {
let req: RouterRequest = req.into();
let request = serde_json::to_string(&req).unwrap_or_default();
let payload = PayloadUtils::build_payload("RaftRouteRequest", request);
let resp_payload = self.cluster_sender.send_request(addr, payload).await?;
let body_vec = resp_payload.body.unwrap_or_default().value;
let resp: RouterResponse = serde_json::from_slice(&body_vec)?;
match resp {
RouterResponse::CacheManagerResult { result } => Ok(result),
_ => Err(anyhow::anyhow!("response type is error!")),
}
}
RouteAddr::Unknown => Err(self.unknown_err()),
}
}
}
7 changes: 5 additions & 2 deletions src/raft/cluster/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use serde::{Deserialize, Serialize};

use crate::{
config::core::ConfigKey,
raft::{db::table::{TableManagerQueryReq, TableManagerReq, TableManagerResult}, cache::{CacheLimiterReq, CacheManagerResult}},
raft::{
cache::{CacheLimiterReq, CacheManagerResult},
db::table::{TableManagerQueryReq, TableManagerReq, TableManagerResult},
},
};

pub enum RouteAddr {
Expand Down Expand Up @@ -63,7 +66,7 @@ pub enum RouterRequest {
},
CacheLimiterReq {
req: CacheLimiterReq,
}
},
}

impl From<SetConfigReq> for RouterRequest {
Expand Down
Loading

0 comments on commit 1ebb77c

Please sign in to comment.