Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Jun 30, 2024
2 parents 3cf10ca + 4891f49 commit 3567342
Show file tree
Hide file tree
Showing 16 changed files with 683 additions and 99 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ rnacos
|RNACOS_INIT_ADMIN_USERNAME|初始化管理员用户名,只在主节点第一次启动时生效|admin|rnacos|0.5.11|
|RNACOS_INIT_ADMIN_PASSWORD|初始化管理员密码,只在主节点第一次启动时生效|admin|rnacos123456|0.5.11|
|RNACOS_ENABLE_METRICS|是否开启监控指标功能|true|true|0.5.13|
|RNACOS_METRICS_LOG_INTERVAL_SECOND|监控指标采集打印到日志的间隔,单位秒,最小间隔为5秒|30|10|0.5.13|
|RNACOS_METRICS_COLLECT_INTERVAL_SECOND|监控指标采集指标间隔,单位秒,最小间隔为1秒,不能小于RNACOS_METRICS_LOG_INTERVAL_SECOND|15|5|0.5.14|
|RNACOS_METRICS_LOG_INTERVAL_SECOND|监控指标采集打印到日志的间隔,单位秒,最小间隔为5秒|60|30|0.5.13|


启动配置方式可以参考: [运行参数说明](https://r-nacos.github.io/docs/notes/env_config/)
Expand Down Expand Up @@ -347,7 +348,6 @@ nacos_rust_client = "0.3.0"
访问认证:

1. 有提供获取认证token的接口
2. 实际请求暂不支持认证,都算认证通过。

配置中心:

Expand Down
7 changes: 5 additions & 2 deletions doc/conf/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ RNACOS_INIT_ADMIN_USERNAME=admin
# 初始化管理员密码在主节点第一次启动时生效,默认值:admin
RNACOS_INIT_ADMIN_PASSWORD=admin

#是否开启监控指标功能
#是否开启监控指标功能,开启后支持prometheus metrics exporter http://127.0.0.1:8848/metrics
RNACOS_ENABLE_METRICS=true

#监控指标采集指标间隔,单位秒,最小间隔为1秒,不能小于RNACOS_METRICS_LOG_INTERVAL_SECOND
RNACOS_METRICS_COLLECT_INTERVAL_SECOND=15

#监控指标采集打印到日志的间隔,单位秒,最小间隔为5秒
RNACOS_METRICS_LOG_INTERVAL_SECOND=30
RNACOS_METRICS_LOG_INTERVAL_SECOND=60

17 changes: 15 additions & 2 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct AppSysConfig {
pub init_admin_username: String,
pub init_admin_password: String,
pub metrics_enable: bool,
pub metrics_collect_interval_second: u64,
pub metrics_log_interval_second: u64,
}

Expand Down Expand Up @@ -160,13 +161,24 @@ impl AppSysConfig {
.unwrap_or("true".to_owned())
.parse()
.unwrap_or(true);
let mut metrics_collect_interval_second =
std::env::var("RNACOS_METRICS_COLLECT_INTERVAL_SECOND")
.unwrap_or("15".to_owned())
.parse()
.unwrap_or(15);
if metrics_collect_interval_second < 1 {
metrics_collect_interval_second = 1;
}
let mut metrics_log_interval_second = std::env::var("RNACOS_METRICS_LOG_INTERVAL_SECOND")
.unwrap_or("30".to_owned())
.unwrap_or("60".to_owned())
.parse()
.unwrap_or(30);
.unwrap_or(60);
if metrics_log_interval_second < 5 {
metrics_log_interval_second = 5;
}
if metrics_log_interval_second < metrics_collect_interval_second {
metrics_collect_interval_second = metrics_log_interval_second;
}
Self {
config_db_dir,
config_db_file,
Expand All @@ -191,6 +203,7 @@ impl AppSysConfig {
init_admin_username,
init_admin_password,
metrics_enable,
metrics_collect_interval_second,
metrics_log_interval_second,
}
}
Expand Down
117 changes: 104 additions & 13 deletions src/metrics/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use crate::metrics::counter::CounterManager;
use crate::metrics::gauge::GaugeManager;
use crate::metrics::histogram::HistogramManager;
use crate::metrics::metrics_key::MetricsKey;
use crate::metrics::model::{MetricsItem, MetricsQuery, MetricsRecord, MetricsRequest};
use crate::metrics::model::{
MetricsItem, MetricsQuery, MetricsRecord, MetricsRequest, MetricsResponse,
};
use crate::metrics::summary::SummaryManager;
use crate::naming::core::NamingActor;
use crate::now_millis;
use actix::prelude::*;
use bean_factory::{bean, BeanFactory, FactoryData, Inject};
use bytes::BytesMut;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::{Pid, System};
Expand All @@ -20,15 +24,19 @@ pub struct MetricsManager {
counter_manager: CounterManager,
gauge_manager: GaugeManager,
histogram_manager: HistogramManager,
summary_manager: SummaryManager,
summary_key_config: Vec<(MetricsKey, MetricsKey)>,
naming_actor: Option<Addr<NamingActor>>,
config_actor: Option<Addr<ConfigActor>>,
bi_stream_manage: Option<Addr<BiStreamManage>>,
system: System,
current_process_id: u32,
start_time_millis: u64,
//last_load_time: u64,
total_memory: f64,
collect_interval: u64,
log_interval: u64,
last_collect_time: u64,
last_log_time: u64,
metrics_enable: bool,
}

Expand All @@ -51,15 +59,19 @@ impl MetricsManager {
counter_manager: Default::default(),
gauge_manager,
histogram_manager: Default::default(),
summary_manager: Default::default(),
summary_key_config: Default::default(),
naming_actor: None,
config_actor: None,
bi_stream_manage: None,
system,
current_process_id,
start_time_millis,
//last_load_time: 0,
total_memory,
log_interval: 30,
log_interval: 60,
collect_interval: 15,
last_collect_time: 0,
last_log_time: 0,
metrics_enable: true,
}
}
Expand All @@ -73,16 +85,43 @@ impl MetricsManager {
self.histogram_manager.init(
MetricsKey::GrpcRequestHandleRtHistogram,
&[
0.5f64, 1f64, 3f64, 5f64, 10f64, 25f64, 50f64, 100f64, 300f64, 500f64, 1000f64,
0.25f64, 0.5f64, 1f64, 3f64, 5f64, 10f64, 25f64, 50f64, 100f64, 300f64, 500f64,
],
);
self.summary_manager.init(
MetricsKey::GrpcRequestHandleRtSummary,
&[0.5f64, 0.6f64, 0.7f64, 0.8f64, 0.9f64, 0.95f64, 1f64],
);
// 单位毫秒ms
self.histogram_manager.init(
MetricsKey::HttpRequestHandleRtHistogram,
&[
0.5f64, 1f64, 3f64, 5f64, 10f64, 25f64, 50f64, 100f64, 300f64, 500f64, 1000f64,
0.25f64, 0.5f64, 1f64, 3f64, 5f64, 10f64, 25f64, 50f64, 100f64, 300f64, 500f64,
],
);
self.summary_manager.init(
MetricsKey::HttpRequestHandleRtSummary,
&[0.5f64, 0.6f64, 0.7f64, 0.8f64, 0.9f64, 0.95f64, 1f64],
);

//summary from histogram
self.summary_key_config.push((
MetricsKey::HttpRequestHandleRtSummary,
MetricsKey::HttpRequestHandleRtHistogram,
));
self.summary_key_config.push((
MetricsKey::GrpcRequestHandleRtSummary,
MetricsKey::GrpcRequestHandleRtHistogram,
));
}

fn reset_summary(&mut self) {
for (summary_key, histogram_key) in &self.summary_key_config {
if let Some(histogram_value) = self.histogram_manager.get_value(histogram_key) {
self.summary_manager
.recalculate_from_histogram(summary_key, histogram_value);
}
}
}

async fn do_peek_metrics(
Expand Down Expand Up @@ -127,11 +166,18 @@ impl MetricsManager {
}
}

fn print_metrics(&self) {
fn print_metrics(&mut self) {
let now = now_millis();
if now - self.last_log_time < (self.log_interval - 1) * 1000 {
return;
}
//log::info!("-------------- log metrics start --------------");
self.print_sys_metrics();
self.gauge_manager.print_metrics();
self.counter_manager.print_metrics();
self.histogram_manager.print_metrics();
self.summary_manager.print_metrics();
self.last_log_time = now;
}

fn load_sys_metrics(&mut self) {
Expand All @@ -142,12 +188,40 @@ impl MetricsManager {
let vms = process.virtual_memory() as f64 / (1024.0 * 1024.0);
let rss_usage = rss / self.total_memory * 100.0;
let running_seconds = (now_millis() - self.start_time_millis) / 1000;
log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {}M|vms: {}M|total_memory: {}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
//log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {:.2}M|vms: {:.2}M|total_memory: {:.2}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
self.gauge_manager
.set(MetricsKey::ProcessStartTimeSeconds, running_seconds as f64);
self.gauge_manager.set(MetricsKey::AppCpuUsage, cpu_usage);
self.gauge_manager.set(MetricsKey::AppRssMemory, rss);
self.gauge_manager.set(MetricsKey::AppVmsMemory, vms);
self.gauge_manager.set(MetricsKey::AppRssMemory, rss_usage);
self.gauge_manager
.set(MetricsKey::AppMemoryUsage, rss_usage);
}
self.last_collect_time = now_millis();
}

fn print_sys_metrics(&self) {
let cpu_usage = self
.gauge_manager
.value(&MetricsKey::AppCpuUsage)
.unwrap_or_default();
let rss = self
.gauge_manager
.value(&MetricsKey::AppRssMemory)
.unwrap_or_default();
let vms = self
.gauge_manager
.value(&MetricsKey::AppVmsMemory)
.unwrap_or_default();
let rss_usage = self
.gauge_manager
.value(&MetricsKey::AppMemoryUsage)
.unwrap_or_default();
let running_seconds = self
.gauge_manager
.value(&MetricsKey::ProcessStartTimeSeconds)
.unwrap_or_default();
log::info!("[metrics_system]|already running seconds: {}s|cpu_usage: {:.2}%|rss_usage: {:.2}%|rss: {:.2}M|vms: {:.2}M|total_memory: {:.2}M|",running_seconds,&cpu_usage,&rss_usage,&rss,&vms,&self.total_memory);
}

fn load_metrics(&mut self, ctx: &mut Context<Self>) {
Expand All @@ -159,17 +233,28 @@ impl MetricsManager {
.map(|r, act, ctx| {
//Self::log_metrics(&r);
act.update_peek_metrics(r);
act.reset_summary();
act.load_sys_metrics();
act.print_metrics();
act.hb(ctx);
})
.spawn(ctx);
}
fn hb(&mut self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::from_secs(self.log_interval), |act, ctx| {
ctx.run_later(Duration::from_secs(self.collect_interval), |act, ctx| {
act.load_metrics(ctx);
});
}

fn export(&mut self) -> anyhow::Result<String> {
let mut bytes_mut = BytesMut::new();
self.counter_manager.export(&mut bytes_mut)?;
self.gauge_manager.export(&mut bytes_mut)?;
self.histogram_manager.export(&mut bytes_mut)?;
self.reset_summary();
self.summary_manager.export(&mut bytes_mut)?;
Ok(String::from_utf8(bytes_mut.to_vec())?)
}
}

impl Actor for MetricsManager {
Expand All @@ -194,6 +279,7 @@ impl Inject for MetricsManager {
let sys_config: Option<Arc<AppSysConfig>> = factory_data.get_bean();
if let Some(sys_config) = sys_config {
self.metrics_enable = sys_config.metrics_enable;
self.collect_interval = sys_config.metrics_collect_interval_second;
self.log_interval = sys_config.metrics_log_interval_second;
if self.metrics_enable {
log::info!("metrics enable! log_interval: {}s", self.log_interval);
Expand All @@ -209,22 +295,27 @@ impl Inject for MetricsManager {
}

impl Handler<MetricsRequest> for MetricsManager {
type Result = anyhow::Result<()>;
type Result = anyhow::Result<MetricsResponse>;

fn handle(&mut self, msg: MetricsRequest, _ctx: &mut Self::Context) -> Self::Result {
if !self.metrics_enable {
return Ok(());
return Ok(MetricsResponse::None);
}
match msg {
MetricsRequest::Record(item) => {
self.update_item_record(item);
Ok(MetricsResponse::None)
}
MetricsRequest::BatchRecord(items) => {
for item in items {
self.update_item_record(item);
}
Ok(MetricsResponse::None)
}
MetricsRequest::Export => {
let v = self.export()?;
Ok(MetricsResponse::ExportInfo(v))
}
}
Ok(())
}
}
12 changes: 11 additions & 1 deletion src/metrics/counter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::metrics::metrics_key::{MetricsKey, ORDER_ALL_KEYS};
use crate::metrics::model::CounterValue;
use crate::metrics::model::{CounterValue, CounterValueFmtWrap};
use bytes::BytesMut;
use std::collections::HashMap;
use std::fmt::Write;

type Key = MetricsKey;

Expand Down Expand Up @@ -33,4 +35,12 @@ impl CounterManager {
}
}
}

pub fn export(&mut self, bytes_mut: &mut BytesMut) -> anyhow::Result<()> {
for (key, value) in self.date_map.iter() {
bytes_mut.write_str(&format!("{}", &CounterValueFmtWrap::new(key, value)))?;
}
//bytes_mut.write_str("\n")?;
Ok(())
}
}
20 changes: 19 additions & 1 deletion src/metrics/gauge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::metrics::metrics_key::{MetricsKey, ORDER_ALL_KEYS};
use crate::metrics::model::GaugeValue;
use crate::metrics::model::{GaugeValue, GaugeValueFmtWrap};
use bytes::BytesMut;
use std::collections::HashMap;
use std::fmt::Write;

type Key = MetricsKey;

Expand Down Expand Up @@ -34,6 +36,14 @@ impl GaugeManager {
}
}

pub fn value(&self, key: &Key) -> Option<f64> {
if let Some(item) = self.date_map.get(&key) {
Some(item.0.to_owned())
} else {
None
}
}

pub fn print_metrics(&self) {
//log::info!("-------------- METRICS GAUGE --------------");
for key in ORDER_ALL_KEYS.iter() {
Expand All @@ -42,4 +52,12 @@ impl GaugeManager {
}
}
}

pub fn export(&mut self, bytes_mut: &mut BytesMut) -> anyhow::Result<()> {
for (key, value) in self.date_map.iter() {
bytes_mut.write_str(&format!("{}", &GaugeValueFmtWrap::new(key, value)))?;
}
//bytes_mut.write_str("\n")?;
Ok(())
}
}
Loading

0 comments on commit 3567342

Please sign in to comment.