Skip to content

Commit

Permalink
refactor: update source references to use mutable access in built-in …
Browse files Browse the repository at this point in the history
…functions
  • Loading branch information
dwpeng committed Nov 14, 2024
1 parent 7ce6495 commit 9b4aa50
Show file tree
Hide file tree
Showing 44 changed files with 405 additions and 229 deletions.
13 changes: 6 additions & 7 deletions src/filterx/src/files/csv.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::args::{CsvCommand, ShareArgs};
use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::{detect_columns, Source};
use filterx_engine::vm::Vm;
use filterx_source::{detect_columns, DataframeSource, Source, SourceType};

use filterx_core::{util, FilterxResult};

Expand Down Expand Up @@ -48,12 +48,11 @@ pub fn filterx_csv(cmd: CsvCommand) -> FilterxResult<()> {
true,
)?;
let columns = detect_columns(lazy_df.clone())?;
let mut s = Source::new(lazy_df.clone());
let mut s = DataframeSource::new(lazy_df.clone());
s.set_has_header(header.unwrap());
s.set_init_column_names(&columns);
let mut vm = Vm::from_dataframe(s);
vm.source.set_has_header(header.unwrap());
vm.set_scope(VmSourceType::Csv);
let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Csv));
vm.source_mut().set_has_header(header.unwrap());
let expr = util::merge_expr(expr);
let writer = Box::new(writer);
vm.set_writer(writer);
Expand All @@ -62,7 +61,7 @@ pub fn filterx_csv(cmd: CsvCommand) -> FilterxResult<()> {
if vm.status.printed {
return Ok(());
}
let mut df = vm.source.into_df()?;
let mut df = vm.into_df()?;
if output.is_none() && table.unwrap_or(false) {
println!("{}", df);
return Ok(());
Expand Down
30 changes: 10 additions & 20 deletions src/filterx/src/files/fasta.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use polars::frame::DataFrame;
use polars::prelude::IntoLazy;

use crate::args::{FastaCommand, ShareArgs};
use std::io::Write;

use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::{FastaSource, Source};
use filterx_engine::vm::Vm;
use filterx_source::{FastaSource, Source, SourceType};

use filterx_core::{util, FilterxResult};

Expand Down Expand Up @@ -51,28 +48,21 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> {
}
return Ok(());
}
let mut chunk_size = long.unwrap();
let mut vm = Vm::from_dataframe(Source::new(DataFrame::empty().lazy()));
vm.set_scope(VmSourceType::Fasta);
let chunk_size = long.unwrap();
let mut vm = Vm::from_source(Source::new(source.into(), SourceType::Fasta));
vm.source.df_source_mut().set_init_column_names(&names);
vm.set_writer(output);
vm.status.set_chunk_size(chunk_size);
'stop_parse: loop {
if vm.status.consume_rows >= vm.status.limit_rows {
break;
}
chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows);
let df = source.into_dataframe(chunk_size)?;
if df.is_none() {
break;
let left = vm.next_batch()?;
if left.is_none() {
break 'stop_parse;
}
let mut dataframe_source = Source::new(df.unwrap().lazy());
dataframe_source.set_init_column_names(&names);
vm.source = dataframe_source;
vm.next_batch()?;
vm.eval_once(&expr)?;
vm.finish()?;
if !vm.status.printed {
let df = vm.into_df()?;
let writer = vm.writer.as_mut().unwrap();
let df = vm.source.into_df()?;
let cols = df.get_columns();
let seq_col = cols.iter().position(|x| x.name() == "seq");
let name_col = cols.iter().position(|x| x.name() == "name");
Expand Down
30 changes: 10 additions & 20 deletions src/filterx/src/files/fastq.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use polars::frame::DataFrame;
use polars::prelude::IntoLazy;

use crate::args::{FastqCommand, ShareArgs};
use std::io::Write;

use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::{FastqSource, Source};
use filterx_engine::vm::Vm;
use filterx_source::{FastqSource, Source, SourceType};

use filterx_core::{util, FilterxResult};
pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
Expand Down Expand Up @@ -63,28 +60,21 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> {
}
return Ok(());
}
let mut chunk_size = long.unwrap();
let mut vm = Vm::from_dataframe(Source::new(DataFrame::empty().lazy()));
vm.set_scope(VmSourceType::Fastq);
let chunk_size = long.unwrap();
let mut vm = Vm::from_source(Source::new(source.into(), SourceType::Fastq));
vm.set_writer(output);
vm.status.set_chunk_size(chunk_size);
vm.source_mut().set_init_column_names(&names);
'stop_parse: loop {
if vm.status.consume_rows >= vm.status.limit_rows {
break;
}
chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows);
let df = source.into_dataframe(chunk_size)?;
if df.is_none() {
break;
let left = vm.next_batch()?;
if left.is_none() {
break 'stop_parse;
}
let mut dataframe_source = Source::new(df.unwrap().lazy());
dataframe_source.set_init_column_names(&names);
vm.source = dataframe_source;
vm.next_batch()?;
vm.eval_once(&expr)?;
vm.finish()?;
if !vm.status.printed {
let df = vm.into_df()?;
let writer = vm.writer.as_mut().unwrap();
let df = vm.source.into_df()?;
let cols = df.get_columns();
let name_col = cols.iter().position(|x| x.name() == "name");
let seq_col = cols.iter().position(|x| x.name() == "seq");
Expand Down
13 changes: 7 additions & 6 deletions src/filterx/src/files/gxf.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use filterx_source::Source;
use filterx_source::SourceType;
use polars::prelude::DataType;
use polars::prelude::SchemaRef;

use crate::args::{GFFCommand, ShareArgs};
use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::Source;
use filterx_engine::vm::Vm;
use filterx_source::DataframeSource;

use filterx_core::{util, FilterxResult};

Expand Down Expand Up @@ -51,10 +53,9 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> {
Some(vec![".", "?"]),
true,
)?;
let mut s = Source::new(lazy_df.clone());
let mut s = DataframeSource::new(lazy_df.clone());
s.set_init_column_names(&names);
let mut vm = Vm::from_dataframe(s);
vm.set_scope(VmSourceType::Gxf);
let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Gxf));
let expr = util::merge_expr(expr);
let writer = Box::new(writer);
vm.set_writer(writer);
Expand All @@ -63,7 +64,7 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> {
if vm.status.printed {
return Ok(());
}
let mut df = vm.source.into_df()?;
let mut df = vm.into_df()?;
if output.is_none() && table.unwrap_or(false) {
println!("{}", df);
return Ok(());
Expand Down
12 changes: 6 additions & 6 deletions src/filterx/src/files/sam.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use filterx_source::Source;
use polars::prelude::DataType;
use polars::prelude::SchemaRef;

use crate::args::{SamCommand, ShareArgs};
use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::Source;
use filterx_engine::vm::Vm;
use filterx_source::{DataframeSource, SourceType};

use filterx_core::{util, FilterxResult};

Expand Down Expand Up @@ -55,10 +56,9 @@ pub fn filterx_sam(cmd: SamCommand) -> FilterxResult<()> {
"tags",
];
let names = names.iter().map(|x| x.to_string()).collect::<Vec<String>>();
let mut s = Source::new(lazy_df.clone());
let mut s = DataframeSource::new(lazy_df.clone());
s.set_init_column_names(&names);
let mut vm = Vm::from_dataframe(s);
vm.set_scope(VmSourceType::Sam);
let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Sam));
let expr = util::merge_expr(expr);
let writer = Box::new(writer);
vm.set_writer(writer);
Expand All @@ -67,7 +67,7 @@ pub fn filterx_sam(cmd: SamCommand) -> FilterxResult<()> {
if vm.status.printed {
return Ok(());
}
let mut df = vm.source.into_df()?;
let mut df = vm.into_df()?;
if output.is_none() && table.unwrap_or(false) {
println!("{}", df);
return Ok(());
Expand Down
12 changes: 6 additions & 6 deletions src/filterx/src/files/vcf.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use filterx_source::Source;
use polars::prelude::DataType;
use polars::prelude::SchemaRef;

use crate::args::{ShareArgs, VcfCommand};
use filterx_engine::vm::{Vm, VmSourceType};
use filterx_source::Source;
use filterx_engine::vm::Vm;
use filterx_source::{DataframeSource, SourceType};

use filterx_core::{util, FilterxResult};

Expand Down Expand Up @@ -80,10 +81,9 @@ pub fn filterx_vcf(cmd: VcfCommand) -> FilterxResult<()> {
Some(vec!["."]),
true,
)?;
let mut s = Source::new(lazy_df.clone());
let mut s = DataframeSource::new(lazy_df.clone());
s.set_init_column_names(&names);
let mut vm = Vm::from_dataframe(s);
vm.set_scope(VmSourceType::Vcf);
let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Vcf));
let expr = util::merge_expr(expr);
let writer = Box::new(writer);
vm.set_writer(writer);
Expand All @@ -92,7 +92,7 @@ pub fn filterx_vcf(cmd: VcfCommand) -> FilterxResult<()> {
if vm.status.printed {
return Ok(());
}
let mut df = vm.source.into_df()?;
let mut df = vm.into_df()?;
if output.is_none() && table.unwrap_or(false) {
println!("{}", df);
return Ok(());
Expand Down
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ impl<'a> Eval<'a> for ast::StmtAssign {
BinOp
);
let exist = vm
.source
.source_mut()
.ret_column_names
.contains(&new_col_name.to_string());
let if_append = match exist {
true => None,
false => Some(new_col_name.into()),
};
vm.source
vm.source_mut()
.with_column(value.expr()?.alias(new_col_name), if_append);
Ok(Value::None)
}
Expand Down
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/call/builtin/column/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn cast<'a>(
);
let name = col_name.column()?;
let e = col_name.expr()?;
vm.source.has_column(name);
vm.source_mut().has_column(name);
let new_type = match new_type.to_lowercase().as_str() {
"int" => DataType::Int32,
"float" => DataType::Float32,
Expand All @@ -42,7 +42,7 @@ pub fn cast<'a>(

let e = e.cast(new_type);
if inplace {
vm.source.with_column(e.alias(name), None);
vm.source_mut().with_column(e.alias(name), None);
return Ok(value::Value::None);
}

Expand Down
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/call/builtin/column/col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn col(vm: &mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::Value> {
"col only support column index, column name, or function which return a column name."
);
let c = match col_value {
value::Value::Int(i) => vm.source.index2column(i as usize),
value::Value::Int(i) => vm.source_mut().index2column(i as usize),
value::Value::Str(s) => s,
value::Value::Name(c) => c.name,
value::Value::Item(c) => c.col_name,
Expand All @@ -17,7 +17,7 @@ pub fn col(vm: &mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::Value> {
h.white("col only support column index, column name, or function which return a column name.").print_and_exit();
}
};
vm.source.has_column(&c);
vm.source_mut().has_column(&c);
Ok(value::Value::Item(value::Item {
col_name: c,
data_type: None,
Expand Down
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/call/builtin/column/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ pub fn drop<'a>(vm: &'a mut Vm, args: &Vec<ast::Expr>) -> FilterxResult<value::V
for arg in args {
let col = eval_col!(vm, arg, "drop: expected a column name as argument");
let col = col.column()?;
vm.source.has_column(col);
vm.source_mut().has_column(col);
drop_columns.push(col.to_string());
}

vm.source.drop(drop_columns);
vm.source_mut().drop(drop_columns);
Ok(value::Value::None)
}
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/call/builtin/column/drop_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub fn drop_null<'a>(
"len: expected a column name as first argument"
);
let name = col_name.column()?;
vm.source.has_column(name);
vm.source_mut().has_column(name);
let e = col(name).drop_nulls();
if inplace {
vm.source.with_column(e.alias(name), None);
vm.source_mut().with_column(e.alias(name), None);
return Ok(value::Value::None);
}
Ok(value::Value::named_expr(None, e))
Expand Down
4 changes: 2 additions & 2 deletions src/filterx_engine/src/eval/call/builtin/column/dup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn dup<'a>(
for arg in args {
let col = eval_col!(vm, arg, "dup only support column name");
let col = col.column()?;
vm.source.has_column(col);
vm.source_mut().has_column(col);
select_dolumns.push(col.to_string());
}

Expand All @@ -23,7 +23,7 @@ pub fn dup<'a>(
.print_and_exit();
}

vm.source.unique(select_dolumns, unique_strategy);
vm.source_mut().unique(select_dolumns, unique_strategy);

Ok(value::Value::None)
}
6 changes: 3 additions & 3 deletions src/filterx_engine/src/eval/call/builtin/column/fill.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::*;
use polars::prelude::Literal;
use filterx_core::value::Value;
use polars::prelude::Literal;

pub fn fill<'a>(
vm: &'a mut Vm,
Expand All @@ -22,10 +22,10 @@ pub fn fill<'a>(
);
let name = col_name.column()?;
let e = col_name.expr()?;
vm.source.has_column(name);
vm.source_mut().has_column(name);
let e = e.fill_null(const_value.lit());
if inplace {
let lazy = &mut vm.source;
let lazy = vm.source_mut();
lazy.with_column(e.alias(name), None);
return Ok(Value::None);
}
Expand Down
2 changes: 1 addition & 1 deletion src/filterx_engine/src/eval/call/builtin/column/header.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::super::*;

pub fn header(vm: &mut Vm) -> FilterxResult<value::Value> {
let source = &vm.source;
let source = &vm.source_mut();
let schema = source.columns()?;
println!("index\tname\ttype");
for (index, (name, t)) in schema.iter().enumerate() {
Expand Down
6 changes: 3 additions & 3 deletions src/filterx_engine/src/eval/call/builtin/column/is_na.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ pub fn is_na<'a>(vm: &'a mut Vm, args: &Vec<ast::Expr>, not: bool) -> FilterxRes
&args[0],
"is_na: expected a column name as first argument"
);
vm.source.has_column(col_name.column()?);
vm.source_mut().has_column(col_name.column()?);
let col_expr = col_name.expr()?;
if not {
vm.source.filter(col_expr.is_not_nan());
vm.source_mut().filter(col_expr.is_not_nan());
} else {
vm.source.filter(col_expr.is_nan());
vm.source_mut().filter(col_expr.is_nan());
}
Ok(value::Value::None)
}
Loading

0 comments on commit 9b4aa50

Please sign in to comment.