Skip to content

Commit

Permalink
chunk files
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jul 7, 2024
1 parent 5bf4174 commit 11f6a87
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 38 deletions.
2 changes: 1 addition & 1 deletion kr2r/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kr2r"
version = "0.6.1"
version = "0.6.3"
edition = "2021"
authors = ["eric9n@gmail.com"]

Expand Down
7 changes: 4 additions & 3 deletions kr2r/src/bin/annotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,15 @@ fn process_chunk_file<P: AsRef<Path>>(
}

pub fn run(args: Args) -> Result<()> {
let chunk_files = find_and_sort_files(&args.chunk_dir, "sample", ".k2")?;
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d")?;
let chunk_files = find_and_sort_files(&args.chunk_dir, "sample", ".k2", true)?;
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d", true)?;

// 开始计时
let start = Instant::now();
println!("annotate start...");
for chunk_file in chunk_files {
for chunk_file in &chunk_files {
process_chunk_file(&args, chunk_file, &hash_files)?;
let _ = std::fs::remove_file(chunk_file);
}
// 计算持续时间
let duration = start.elapsed();
Expand Down
2 changes: 1 addition & 1 deletion kr2r/src/bin/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub fn run(args: Args) -> Result<()> {
println!("classify start...");
let start = Instant::now();
let meros = idx_opts.as_meros();
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d")?;
let hash_files = find_and_sort_files(&args.database, "hash", ".k2d", true)?;
let chtable = CHTable::from_hash_files(hash_config, &hash_files)?;

process_files(args, meros, hash_config, &chtable, &taxo)?;
Expand Down
4 changes: 2 additions & 2 deletions kr2r/src/bin/kun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let splitr_args = splitr::Args::from(cmd_args.clone());
let chunk_files = find_files(&splitr_args.chunk_dir, "sample", ".k2");
let sample_files = find_files(&splitr_args.chunk_dir, "sample", ".map");
let sample_files = find_files(&splitr_args.chunk_dir, "sample_id", ".map");
let bin_files = find_files(&splitr_args.chunk_dir, "sample", ".bin");
if !chunk_files.is_empty() || !sample_files.is_empty() || !bin_files.is_empty() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"{} `sample` files must be empty",
"The directory '{}' must not contain files with extensions '.k2', '.map', or '.bin' for 'sample' and 'sample_id'",
&splitr_args.chunk_dir.display()
),
)));
Expand Down
57 changes: 38 additions & 19 deletions kr2r/src/bin/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kr2r::compact_hash::{HashConfig, Row};
use kr2r::readcounts::{TaxonCounters, TaxonCountersDash};
use kr2r::report::report_kraken_style;
use kr2r::taxonomy::Taxonomy;
use kr2r::utils::{find_and_sort_files, open_file};
use kr2r::utils::{find_and_trans_files, open_file};
use kr2r::HitGroup;
// use rayon::prelude::*;
use seqkmer::{buffer_map_parallel, trim_pair_info, OptionPair};
Expand Down Expand Up @@ -190,10 +190,10 @@ pub fn run(args: Args) -> Result<()> {
let taxonomy_filename = k2d_dir.join("taxo.k2d");
let taxo = Taxonomy::from_file(taxonomy_filename)?;

let sample_files = find_and_sort_files(&args.chunk_dir, "sample_file", ".bin")?;
let sample_id_files = find_and_sort_files(&args.chunk_dir, "sample_id", ".map")?;
let sample_files = find_and_trans_files(&args.chunk_dir, "sample_file", ".bin", false)?;
let sample_id_files = find_and_trans_files(&args.chunk_dir, "sample_id", ".map", false)?;

let partition = sample_files.len();
// let partition = sample_files.len();
let hash_config = HashConfig::from_hash_header(&args.database.join("hash_config.k2d"))?;
let value_mask = hash_config.value_mask;

Expand All @@ -205,21 +205,22 @@ pub fn run(args: Args) -> Result<()> {
let start = Instant::now();
println!("resolve start...");

for i in 0..partition {
let sample_file = &sample_files[i];
for (i, sample_file) in &sample_files {
// for i in 0..partition {
// let sample_file = &sample_files[i];
let sample_id_map = read_id_to_seq_map(&sample_id_files[i])?;

let thread_sequences = sample_id_map.len();
let mut writer: Box<dyn Write + Send> = match &args.kraken_output_dir {
Some(ref file_path) => {
let filename = file_path.join(format!("output_{}.txt", i + 1));
let filename = file_path.join(format!("output_{}.txt", i));
let file = File::create(filename)?;
Box::new(BufWriter::new(file)) as Box<dyn Write + Send>
}
None => Box::new(BufWriter::new(io::stdout())) as Box<dyn Write + Send>,
};
let (thread_taxon_counts, thread_classified, hit_seq_set) = process_batch::<&PathBuf>(
sample_file,
&sample_file,
&args,
&taxo,
&sample_id_map,
Expand Down Expand Up @@ -261,7 +262,7 @@ pub fn run(args: Args) -> Result<()> {
.unwrap();
});
if let Some(output) = &args.kraken_output_dir {
let filename = output.join(format!("output_{}.kreport2", i + 1));
let filename = output.join(format!("output_{}.kreport2", i));
report_kraken_style(
filename,
args.report_zero_counts,
Expand All @@ -278,23 +279,41 @@ pub fn run(args: Args) -> Result<()> {
}

if let Some(output) = &args.kraken_output_dir {
let filename = output.join("output.kreport2");
report_kraken_style(
filename,
args.report_zero_counts,
args.report_kmer_data,
&taxo,
&total_taxon_counts,
total_seqs as u64,
total_unclassified as u64,
)?;
let min = &sample_files.keys().min().cloned().unwrap();
let max = &sample_files.keys().max().cloned().unwrap();

if max > min {
let filename = output.join(format!("output_{}-{}.kreport2", min, max));
report_kraken_style(
filename,
args.report_zero_counts,
args.report_kmer_data,
&taxo,
&total_taxon_counts,
total_seqs as u64,
total_unclassified as u64,
)?;
}

let source_sample_file = args.chunk_dir.join("sample_file.map");
let to_sample_file = output.join("sample_file.txt");
std::fs::copy(source_sample_file, to_sample_file)?;
}

// 计算持续时间
let duration = start.elapsed();
// 打印运行时间
println!("resolve took: {:?}", duration);

for (_, sample_file) in &sample_files {
let _ = std::fs::remove_file(sample_file);
}

for (_, sample_file) in sample_id_files {
let _ = std::fs::remove_file(sample_file);
}
// let source_sample_file = args.chunk_dir.join("sample_file.map");
// let _ = std::fs::remove_file(source_sample_file);
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion kr2r/src/bin/splitr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,12 @@ pub fn run(args: Args) -> Result<()> {
println!("splitr start...");
let file_num_limit = get_file_limit();
if hash_config.partition >= file_num_limit {
eprintln!(
"file num limit {:?}, need: {:?}",
file_num_limit, hash_config.partition
);
set_fd_limit(hash_config.partition as u64 + 1)
.expect("Failed to set file descriptor limit");
.expect("Failed to set file descriptor limit, please run this operation with administrative/root privileges.");
// panic!("Exceeds File Number Limit");
}

Expand Down
84 changes: 73 additions & 11 deletions kr2r/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,68 @@ pub fn create_sample_file<P: AsRef<Path>>(filename: P) -> BufWriter<File> {

use regex::Regex;

pub fn find_and_trans_files(
directory: &Path,
prefix: &str,
suffix: &str,
check: bool,
) -> io::Result<HashMap<usize, PathBuf>> {
// 构建正则表达式以匹配文件名中的数字
let pattern = format!(r"{}_(\d+){}", prefix, suffix);
let re = Regex::new(&pattern).unwrap();

// 读取指定目录下的所有条目
let entries = fs::read_dir(directory)?
.filter_map(Result::ok)
.map(|entry| entry.path())
.filter(|path| {
path.is_file()
&& path
.file_name()
.unwrap()
.to_str()
.map_or(false, |s| s.starts_with(prefix) && s.ends_with(suffix))
})
.collect::<Vec<PathBuf>>();

// 使用正则表达式提取数字,并将它们存入HashMap
let mut map_entries = HashMap::new();
for path in entries {
if let Some(fname) = path.file_name().and_then(|name| name.to_str()) {
if let Some(cap) = re.captures(fname) {
if let Some(m) = cap.get(1) {
if let Ok(num) = m.as_str().parse::<usize>() {
map_entries.insert(num, path);
}
}
}
}
}

if check {
// 检查数字是否从0开始连续
let mut keys: Vec<_> = map_entries.keys().cloned().collect();
keys.sort_unstable();
for (i, key) in keys.iter().enumerate() {
if i + 1 != *key {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"File numbers are not continuous starting from 1.",
));
}
}
}

// 返回排序后的文件路径
Ok(map_entries)
}

// 函数定义
pub fn find_and_sort_files(
directory: &Path,
prefix: &str,
suffix: &str,
check: bool,
) -> io::Result<Vec<PathBuf>> {
// 构建正则表达式以匹配文件名中的数字
let pattern = format!(r"{}_(\d+){}", prefix, suffix);
Expand All @@ -268,27 +325,32 @@ pub fn find_and_sort_files(
.into_iter()
.filter_map(|path| {
re.captures(path.file_name()?.to_str()?)
.and_then(|caps| caps.get(1).map(|m| m.as_str().parse::<i32>().ok()))
.and_then(|caps| caps.get(1).map(|m| m.as_str().parse::<usize>().ok()))
.flatten()
.map(|num| (path, num))
})
.collect::<Vec<(PathBuf, i32)>>();
.collect::<Vec<(PathBuf, usize)>>();

sorted_entries.sort_by_key(|k| k.1);

// 检查数字是否从0开始连续
for (i, (_, num)) in sorted_entries.iter().enumerate() {
let a_idx = i + 1;
if a_idx as i32 != *num {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"File numbers are not continuous starting from 1.",
));
if check {
// 检查数字是否从0开始连续
for (i, (_, num)) in sorted_entries.iter().enumerate() {
let a_idx = i + 1;
if a_idx != *num {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"File numbers are not continuous starting from 1.",
));
}
}
}

// 返回排序后的文件路径
Ok(sorted_entries.into_iter().map(|(path, _)| path).collect())
Ok(sorted_entries
.iter()
.map(|(path, _)| path.clone())
.collect())
}

pub fn open_file<P: AsRef<Path>>(path: P) -> io::Result<File> {
Expand Down

0 comments on commit 11f6a87

Please sign in to comment.