Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jun 4, 2024
1 parent 669fea1 commit e748b57
Showing 1 changed file with 42 additions and 76 deletions.
118 changes: 42 additions & 76 deletions kr2r/src/compact_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ pub struct Page {
pub data: Vec<u32>,
}

impl Default for Page {
fn default() -> Self {
Self {
index: 1,
size: 1,
data: vec![0],
}
}
}

impl Page {
pub fn new(index: usize, size: usize, data: Vec<u32>) -> Self {
Self { index, size, data }
Expand All @@ -186,38 +196,6 @@ impl Page {
}
}

pub struct PagePtr<'a> {
#[allow(dead_code)]
mmap: Option<Mmap>,
pub index: usize,
pub size: usize,
pub data: &'a [u32],
}

impl<'a> PagePtr<'a> {
pub fn new(mmap: Option<Mmap>, index: usize, size: usize, data: &'a [u32]) -> Self {
Self {
mmap,
index,
size,
data,
}
}
}

impl<'a> Default for PagePtr<'a> {
fn default() -> Self {
// 创建一个包含0的静态数组
static DEFAULT_DATA: [u32; 1] = [0];
Self {
mmap: None,
index: 1,
size: 1,
data: &DEFAULT_DATA,
}
}
}

use std::fmt::{self, Debug};

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -353,11 +331,11 @@ pub trait K2Compact: std::marker::Sync + Send {
}

#[allow(unused)]
pub struct CHPage<'a> {
pub struct CHPage {
// 哈希表的容量
pub config: HashConfig,
pub page: Page,
pub next_page: PagePtr<'a>,
pub next_page: Page,
}

fn _read_page_from_file<P: AsRef<Path>>(filename: P) -> Result<Page> {
Expand Down Expand Up @@ -406,71 +384,59 @@ fn read_page_from_file<P: AsRef<Path>>(filename: P) -> Result<Page> {
Ok(Page::new(index, capacity, data))
}

fn _read_pageptr_from_file<'a, P: AsRef<Path>>(filename: P) -> Result<PagePtr<'a>> {
let file = OpenOptions::new().read(true).open(&filename)?;
let mmap = unsafe { MmapOptions::new().populate().map(&file)? };
let index = LittleEndian::read_u64(&mmap[0..8]) as usize;
let capacity = LittleEndian::read_u64(&mmap[8..16]) as usize;

let raw_page_data =
unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const u32, capacity) };

let first_zero_end = raw_page_data
.iter()
.position(|&x| x == 0)
.map(|pos| pos + 1)
.unwrap_or(capacity);
let page_data = &raw_page_data[..first_zero_end];

// let page_data =
// unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const u32, capacity) };

Ok(PagePtr::new(Some(mmap), index, first_zero_end, page_data))
}
fn read_pageptr_from_file_chunk<P: AsRef<Path>>(filename: P) -> Result<Page> {
let mut file = std::fs::File::open(filename)?;

fn read_pageptr_from_file_chunk<'a, P: AsRef<Path>>(filename: P) -> Result<PagePtr<'a>> {
let file = OpenOptions::new().read(true).open(&filename)?;
let mmap = unsafe { MmapOptions::new().populate().map(&file)? };
let index = LittleEndian::read_u64(&mmap[0..8]) as usize;
let capacity = LittleEndian::read_u64(&mmap[8..16]) as usize;
// Read the index and capacity
let mut buffer = [0u8; 16];
file.read_exact(&mut buffer)?;
let index = LittleEndian::read_u64(&buffer[0..8]) as usize;
let capacity = LittleEndian::read_u64(&buffer[8..16]) as usize;

let mut first_zero_end = capacity;
let chunk_size = 1024; // 定义每次读取的块大小
let chunk_size = 1024; // Define the chunk size for reading
let mut found_zero = false;

for i in (0..capacity).step_by(chunk_size) {
let end = usize::min(i + chunk_size, capacity);
let chunk = unsafe {
std::slice::from_raw_parts(mmap.as_ptr().add(16 + i * 4) as *const u32, end - i)
};
if let Some(pos) = chunk.iter().position(|&x| x == 0) {
first_zero_end = i + pos + 1;
let mut data = vec![0u32; capacity];
let mut read_pos = 0;

while read_pos < capacity {
let end = usize::min(read_pos + chunk_size, capacity);
let bytes_to_read = (end - read_pos) * std::mem::size_of::<u32>();
let mut chunk = vec![0u8; bytes_to_read];
file.read_exact(&mut chunk)?;
let chunk_u32 =
unsafe { std::slice::from_raw_parts(chunk.as_ptr() as *const u32, end - read_pos) };
data[read_pos..end].copy_from_slice(chunk_u32);

if let Some(pos) = chunk_u32.iter().position(|&x| x == 0) {
first_zero_end = read_pos + pos + 1;
found_zero = true;
break;
}
read_pos = end;
}

if !found_zero {
first_zero_end = capacity;
eprintln!("Warning: No zero value found in the data, using full capacity.");
}

let page_data =
unsafe { std::slice::from_raw_parts(mmap.as_ptr().add(16) as *const u32, first_zero_end) };
data.truncate(first_zero_end);

Ok(PagePtr::new(Some(mmap), index, first_zero_end, page_data))
Ok(Page::new(index, first_zero_end, data))
}

impl<'a> CHPage<'a> {
impl CHPage {
pub fn from<P: AsRef<Path> + Debug>(
config: HashConfig,
chunk_file1: P,
chunk_file2: P,
) -> Result<CHPage<'a>> {
) -> Result<CHPage> {
let page = read_page_from_file(chunk_file1)?;
let next_page = if page.data.last().map_or(false, |&x| x == 0) {
read_pageptr_from_file_chunk(chunk_file2)?
} else {
PagePtr::default()
Page::default()
};

let chtm = CHPage {
Expand Down Expand Up @@ -506,7 +472,7 @@ impl<'a> CHPage<'a> {
}
}

impl<'a> K2Compact for CHPage<'a> {
impl K2Compact for CHPage {
fn get_idx_mask(&self) -> usize {
let idx_bits = ((self.config.hash_capacity as f64).log2().ceil() as usize).max(1);
(1 << idx_bits) - 1
Expand Down

0 comments on commit e748b57

Please sign in to comment.