Skip to content

Commit

Permalink
fix reader & searcher
Browse files Browse the repository at this point in the history
1. fix
find_offset_to_start(#146)
2. fix
read_metrics_one_file_by_end_time(#145)
  • Loading branch information
flearc authored and Forsworns committed Apr 21, 2024
1 parent b1107f9 commit a22e270
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 43 deletions.
1 change: 1 addition & 0 deletions sentinel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ schemars = { version = "0.8.8", optional = true }
apollo-client = { version = "0.7.5", optional = true }
futures-util = { version = "0.3.29", optional = true }
dirs = "5.0.1"
byteorder = "1.5.0"

[target.'cfg(not(target_arch="wasm32"))'.dependencies]
# cannot add "wasm-bindgen" feature to uuid,
Expand Down
46 changes: 22 additions & 24 deletions sentinel-core/src/core/log/metric/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,36 @@ impl DefaultMetricLogReader {
let end_sec = end_ms / 1000;
let file = open_file_and_seek_to(filename, offset)?;

let mut buf_reader = BufReader::new(file);
let buf_reader = BufReader::new(file);
let mut items = Vec::with_capacity(1024);
loop {
let mut line = String::new();
let count = buf_reader.read_line(&mut line)?;
if count == 0 {
return Ok((Vec::new(), true));
}
let item = base::MetricItem::from_string(&line);
match item {
Ok(item) => {
let ts_sec = item.timestamp / 1000;
// current_second should in [begin_sec, end_sec]
if ts_sec < begin_sec || ts_sec > end_sec {
return Ok((items, false));
}

// empty resource name indicates "fetch all"
if resource.is_empty() || resource == &item.resource {
items.push(item);
}
let lines = buf_reader.lines();

if prev_size + items.len() >= MAX_ITEM_AMOUNT {
return Ok((items, false));
}
}
for line in lines {
let line = line?;
let item = match base::MetricItem::from_string(&line) {
Ok(item) => item,
Err(err) => {
logging::error!("DefaultMetricLogReader::read_metrics_one_file_by_end_time: {:?} Failed to convert to MetricItem. Error: {:?}.", line,err);
logging::error!("Failed to convert to MetricItem: {:?}", err);
continue;
}
};

let ts_time = item.timestamp / 1000;
if ts_time < begin_sec || ts_time > end_sec {
return Ok((items, false)); // Outside time range
}

if resource.is_empty() || resource == &item.resource {
items.push(item);
}

if prev_size + items.len() >= MAX_ITEM_AMOUNT {
return Ok((items, false)); // Reached maximum item amount
}
}

Ok((items, true)) // End of file
}
}

Expand Down
40 changes: 21 additions & 19 deletions sentinel-core/src/core/log/metric/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use byteorder::{BigEndian, ReadBytesExt};

use super::*;
use crate::{logging, Error, Result};
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::io::{Cursor, Read, Seek, SeekFrom};
use std::sync::Mutex;
#[derive(Debug)]
pub struct FilePosition {
Expand Down Expand Up @@ -139,32 +141,32 @@ impl DefaultMetricSearcher {
filename: &str,
begin_time_ms: u64,
last_pos: SeekFrom,
) -> Result<u32> {
let mut cached_pos = self.cached_pos.lock().unwrap();
cached_pos.idx_filename = "".into();
cached_pos.metric_filename = "".into();

) -> Result<u64> {
let idx_filename = form_metric_idx_filename(filename);
let begin_sec = begin_time_ms / 1000;
let mut file = File::open(&idx_filename)?;

// Set position to the offset recorded in the idx file
cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(last_pos)?);
let mut sec: u64;
loop {
let mut buffer: [u8; 8] = [0; 8];
file.read_exact(&mut buffer)?;
sec = u64::from_be_bytes(buffer);
let mut cached_pos = self.cached_pos.lock().unwrap();

// Seek to the last position
file.seek(last_pos)?;

let mut index_data = Vec::new();
file.read_to_end(&mut index_data)?;

let mut offset = 0;
let mut sec = 0;

let mut reader = Cursor::new(index_data);
while let Ok(sec_be) = ReadBytesExt::read_u64::<BigEndian>(&mut reader) {
sec = sec_be;
let offset_be = ReadBytesExt::read_u64::<BigEndian>(&mut reader)?;
offset = offset_be;
if sec >= begin_sec {
break;
}
let mut buffer: [u8; 4] = [0; 4];
file.read_exact(&mut buffer)?;
cached_pos.cur_offset_in_idx = SeekFrom::Start(file.seek(SeekFrom::Current(0))?);
}
let mut buffer: [u8; 4] = [0; 4];
file.read_exact(&mut buffer)?;
let offset = u32::from_be_bytes(buffer);

// Cache the idx filename and position
cached_pos.metric_filename = filename.into();
cached_pos.idx_filename = idx_filename.into();
Expand Down

0 comments on commit a22e270

Please sign in to comment.