Skip to content

Commit

Permalink
feat: refactor limit handling and improve column validation in FASTA …
Browse files Browse the repository at this point in the history
…and FASTQ processing
  • Loading branch information
dwpeng committed Nov 8, 2024
1 parent f2ff0ef commit 086d599
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 30 deletions.
47 changes: 39 additions & 8 deletions src/cmd/fasta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> {
vm.set_scope(VmSourceType::Fasta);
vm.set_writer(output);
'stop_parse: loop {
chunk_size = usize::min(chunk_size, vm.status.limit - vm.status.cosumer_rows);
if vm.status.consume_rows >= vm.status.limit_rows {
break;
}
chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows);
let df = source.into_dataframe(chunk_size)?;
if df.is_none() {
break;
Expand All @@ -73,11 +76,43 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> {
let writer = vm.writer.as_mut().unwrap();
let df = vm.source.into_df()?;
let cols = df.get_columns();
let seq_col = cols.iter().position(|x| x.name() == "seq");
if seq_col.is_none() {
let h = &mut vm.hint;
h.white("Lost ")
.cyan("'seq'")
.white(" column.")
.print_and_exit();
}
let name_col = cols.iter().position(|x| x.name() == "name");
if name_col.is_none() {
let h = &mut vm.hint;
h.white("Lost ")
.cyan("'name'")
.white(" column.")
.print_and_exit();
}

let comm_col = cols.iter().position(|x| x.name() == "comm");
let cols = df.get_columns();
let seq_col = seq_col.unwrap();
let name_col = name_col.unwrap();

let valid_cols;
if comm_col.is_some() {
valid_cols = vec![name_col, comm_col.unwrap(), seq_col]
} else {
valid_cols = vec![name_col, seq_col]
}

let rows = df.height();
for i in 0..rows {
// TODO: handle the case where the column is not found
// TODO: handle the case where the column's rank is not right
for col in cols.iter() {
if vm.status.consume_rows >= vm.status.limit_rows {
break 'stop_parse;
}
vm.status.consume_rows += 1;
for col_index in &valid_cols {
let col = &cols[*col_index];
match col.name().as_str() {
"seq" => {
let seq = col.get(i).unwrap();
Expand All @@ -99,10 +134,6 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> {
}
}
}
vm.status.cosumer_rows += 1;
if vm.status.cosumer_rows >= vm.status.limit {
break 'stop_parse;
}
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/cmd/fastq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
None => None,
};

let mut names = vec!["name", "seq", "comm", "qual"];
let mut names = vec!["name", "comm", "seq", "qual"];

match no_comment {
Some(true) => {
Expand Down Expand Up @@ -71,7 +71,10 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
vm.set_scope(VmSourceType::Fastq);
vm.set_writer(output);
'stop_parse: loop {
chunk_size = usize::min(chunk_size, vm.status.limit - vm.status.cosumer_rows);
if vm.status.consume_rows >= vm.status.limit_rows {
break;
}
chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows);
let df = source.into_dataframe(chunk_size)?;
if df.is_none() {
break;
Expand All @@ -89,6 +92,10 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
let rows = df.height();
for i in 0..rows {
for col in cols.iter() {
if vm.status.consume_rows >= vm.status.limit_rows {
break 'stop_parse;
}
vm.status.consume_rows += 1;
match col.name().as_str() {
"name" => {
let name = col.get(i).unwrap();
Expand Down Expand Up @@ -116,10 +123,6 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
}
}
}
vm.status.cosumer_rows += 1;
if vm.status.cosumer_rows >= vm.status.limit {
break 'stop_parse;
}
}
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/engine/eval/call/builtin/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ pub fn head(vm: &mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::Value> {
};

vm.source.slice(0, nrow);
vm.status.limit = nrow;
vm.status.limit_rows = nrow;
Ok(value::Value::None)
}
2 changes: 1 addition & 1 deletion src/engine/eval/call/builtin/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn limit<'a>(vm: &'a mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::

match vm.source_type {
VmSourceType::Fasta | VmSourceType::Fastq => {
vm.status.limit = nrow;
vm.status.limit_rows = nrow;
return Ok(value::Value::None);
}
_ => {}
Expand Down
4 changes: 3 additions & 1 deletion src/engine/eval/call/builtin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,7 @@ builtin_function! {
is_null,
is_na,
nr,
drop_null
drop_null,
to_fasta,
to_fastq,
}
9 changes: 4 additions & 5 deletions src/engine/eval/call/builtin/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ pub fn print<'a>(vm: &'a mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::
let fmt = df.column("fmt").unwrap();
let writer = vm.writer.as_mut().unwrap();
let writer = writer.as_mut();
let mut consmer_rows = vm.status.cosumer_rows;
if vm.status.cosumer_rows >= vm.status.limit {
println!("limit: {}, consume: {}", vm.status.limit_rows, vm.status.consume_rows);
if vm.status.consume_rows >= vm.status.limit_rows {
return Ok(value::Value::None);
}
for i in 0..fmt.len() {
Expand All @@ -140,9 +140,8 @@ pub fn print<'a>(vm: &'a mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::
}
let s = s.unwrap();
writeln!(writer, "{}", s)?;
consmer_rows += 1;
if consmer_rows >= vm.status.limit {
vm.status.stop = true;
vm.status.consume_rows += 1;
if vm.status.consume_rows >= vm.status.limit_rows {
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/engine/eval/call/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ impl<'a> Eval<'a> for ast::ExprCall {
"is_not_na" => call::is_na(vm, &self.args, true),
"drop_null" => call::drop_null(vm, &self.args, false),
"drop_null_" => call::drop_null(vm, &self.args, true),
"to_fa" | "to_fasta" => call::to_fasta(vm),
"to_fq" | "to_fastq" => call::to_fastq(vm),
_ => {
let simi = compute_similarity(&function_name, vec![]);
let h = &mut vm.hint;
Expand Down
18 changes: 10 additions & 8 deletions src/engine/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@ pub enum VmMode {
#[derive(Debug)]
pub struct VmStatus {
pub apply_lazy: bool,
pub skip: bool,
pub stop: bool,
pub count: usize,
pub limit: usize,
pub limit_rows: usize,
pub offset: usize,
pub printed: bool,
pub cosumer_rows: usize,
pub nrows: usize,
pub consume_rows: usize,
}

impl VmStatus {
pub fn new() -> Self {
Self {
apply_lazy: true,
skip: false,
stop: false,
count: 0,
limit: usize::MAX,
limit_rows: usize::MAX,
offset: 0,
printed: false,
cosumer_rows: 0,
nrows: 0,
consume_rows: 0,
}
}
}
Expand Down Expand Up @@ -212,6 +208,12 @@ impl Vm {
let expr = eval_expr.as_interactive().unwrap();
expr.eval(self)?;
}
// if self.status.stop {
// std::process::exit(0);
// }
// if self.status.printed {
// return Ok(());
// }
}
Ok(())
}
Expand Down

0 comments on commit 086d599

Please sign in to comment.