diff --git a/src/filterx/src/files/csv.rs b/src/filterx/src/files/csv.rs index 976c4a4..4c19d94 100644 --- a/src/filterx/src/files/csv.rs +++ b/src/filterx/src/files/csv.rs @@ -1,6 +1,6 @@ use crate::args::{CsvCommand, ShareArgs}; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::{detect_columns, Source}; +use filterx_engine::vm::Vm; +use filterx_source::{detect_columns, DataframeSource, Source, SourceType}; use filterx_core::{util, FilterxResult}; @@ -48,12 +48,11 @@ pub fn filterx_csv(cmd: CsvCommand) -> FilterxResult<()> { true, )?; let columns = detect_columns(lazy_df.clone())?; - let mut s = Source::new(lazy_df.clone()); + let mut s = DataframeSource::new(lazy_df.clone()); s.set_has_header(header.unwrap()); s.set_init_column_names(&columns); - let mut vm = Vm::from_dataframe(s); - vm.source.set_has_header(header.unwrap()); - vm.set_scope(VmSourceType::Csv); + let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Csv)); + vm.source_mut().set_has_header(header.unwrap()); let expr = util::merge_expr(expr); let writer = Box::new(writer); vm.set_writer(writer); @@ -62,7 +61,7 @@ pub fn filterx_csv(cmd: CsvCommand) -> FilterxResult<()> { if vm.status.printed { return Ok(()); } - let mut df = vm.source.into_df()?; + let mut df = vm.into_df()?; if output.is_none() && table.unwrap_or(false) { println!("{}", df); return Ok(()); diff --git a/src/filterx/src/files/fasta.rs b/src/filterx/src/files/fasta.rs index 471305a..c90391d 100644 --- a/src/filterx/src/files/fasta.rs +++ b/src/filterx/src/files/fasta.rs @@ -1,11 +1,8 @@ -use polars::frame::DataFrame; -use polars::prelude::IntoLazy; - use crate::args::{FastaCommand, ShareArgs}; use std::io::Write; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::{FastaSource, Source}; +use filterx_engine::vm::Vm; +use filterx_source::{FastaSource, Source, SourceType}; use filterx_core::{util, FilterxResult}; @@ -51,28 +48,21 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> { } return Ok(()); } - let mut chunk_size = long.unwrap(); - let mut vm = Vm::from_dataframe(Source::new(DataFrame::empty().lazy())); - vm.set_scope(VmSourceType::Fasta); + let chunk_size = long.unwrap(); + let mut vm = Vm::from_source(Source::new(source.into(), SourceType::Fasta)); + vm.source.df_source_mut().set_init_column_names(&names); vm.set_writer(output); + vm.status.set_chunk_size(chunk_size); 'stop_parse: loop { - if vm.status.consume_rows >= vm.status.limit_rows { - break; - } - chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows); - let df = source.into_dataframe(chunk_size)?; - if df.is_none() { - break; + let left = vm.next_batch()?; + if left.is_none() { + break 'stop_parse; } - let mut dataframe_source = Source::new(df.unwrap().lazy()); - dataframe_source.set_init_column_names(&names); - vm.source = dataframe_source; - vm.next_batch()?; vm.eval_once(&expr)?; vm.finish()?; if !vm.status.printed { + let df = vm.into_df()?; let writer = vm.writer.as_mut().unwrap(); - let df = vm.source.into_df()?; let cols = df.get_columns(); let seq_col = cols.iter().position(|x| x.name() == "seq"); let name_col = cols.iter().position(|x| x.name() == "name"); diff --git a/src/filterx/src/files/fastq.rs b/src/filterx/src/files/fastq.rs index 1796ede..57c5299 100644 --- a/src/filterx/src/files/fastq.rs +++ b/src/filterx/src/files/fastq.rs @@ -1,11 +1,8 @@ -use polars::frame::DataFrame; -use polars::prelude::IntoLazy; - use crate::args::{FastqCommand, ShareArgs}; use std::io::Write; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::{FastqSource, Source}; +use filterx_engine::vm::Vm; +use filterx_source::{FastqSource, Source, SourceType}; use filterx_core::{util, FilterxResult}; pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> { @@ -63,28 +60,21 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> { } return Ok(()); } - let mut chunk_size = long.unwrap(); - let mut vm = Vm::from_dataframe(Source::new(DataFrame::empty().lazy())); - vm.set_scope(VmSourceType::Fastq); + let chunk_size = long.unwrap(); + let mut vm = Vm::from_source(Source::new(source.into(), SourceType::Fastq)); vm.set_writer(output); + vm.status.set_chunk_size(chunk_size); + vm.source_mut().set_init_column_names(&names); 'stop_parse: loop { - if vm.status.consume_rows >= vm.status.limit_rows { - break; - } - chunk_size = usize::min(chunk_size, vm.status.limit_rows - vm.status.consume_rows); - let df = source.into_dataframe(chunk_size)?; - if df.is_none() { - break; + let left = vm.next_batch()?; + if left.is_none() { + break 'stop_parse; } - let mut dataframe_source = Source::new(df.unwrap().lazy()); - dataframe_source.set_init_column_names(&names); - vm.source = dataframe_source; - vm.next_batch()?; vm.eval_once(&expr)?; vm.finish()?; if !vm.status.printed { + let df = vm.into_df()?; let writer = vm.writer.as_mut().unwrap(); - let df = vm.source.into_df()?; let cols = df.get_columns(); let name_col = cols.iter().position(|x| x.name() == "name"); let seq_col = cols.iter().position(|x| x.name() == "seq"); diff --git a/src/filterx/src/files/gxf.rs b/src/filterx/src/files/gxf.rs index 240bd07..3fd09bc 100644 --- a/src/filterx/src/files/gxf.rs +++ b/src/filterx/src/files/gxf.rs @@ -1,9 +1,11 @@ +use filterx_source::Source; +use filterx_source::SourceType; use polars::prelude::DataType; use polars::prelude::SchemaRef; use crate::args::{GFFCommand, ShareArgs}; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::Source; +use filterx_engine::vm::Vm; +use filterx_source::DataframeSource; use filterx_core::{util, FilterxResult}; @@ -51,10 +53,9 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> { Some(vec![".", "?"]), true, )?; - let mut s = Source::new(lazy_df.clone()); + let mut s = DataframeSource::new(lazy_df.clone()); s.set_init_column_names(&names); - let mut vm = Vm::from_dataframe(s); - vm.set_scope(VmSourceType::Gxf); + let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Gxf)); let expr = util::merge_expr(expr); let writer = Box::new(writer); vm.set_writer(writer); @@ -63,7 +64,7 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> { if vm.status.printed { return Ok(()); } - let mut df = vm.source.into_df()?; + let mut df = vm.into_df()?; if output.is_none() && table.unwrap_or(false) { println!("{}", df); return Ok(()); diff --git a/src/filterx/src/files/sam.rs b/src/filterx/src/files/sam.rs index 1198973..fbddfdd 100644 --- a/src/filterx/src/files/sam.rs +++ b/src/filterx/src/files/sam.rs @@ -1,9 +1,10 @@ +use filterx_source::Source; use polars::prelude::DataType; use polars::prelude::SchemaRef; use crate::args::{SamCommand, ShareArgs}; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::Source; +use filterx_engine::vm::Vm; +use filterx_source::{DataframeSource, SourceType}; use filterx_core::{util, FilterxResult}; @@ -55,10 +56,9 @@ pub fn filterx_sam(cmd: SamCommand) -> FilterxResult<()> { "tags", ]; let names = names.iter().map(|x| x.to_string()).collect::>(); - let mut s = Source::new(lazy_df.clone()); + let mut s = DataframeSource::new(lazy_df.clone()); s.set_init_column_names(&names); - let mut vm = Vm::from_dataframe(s); - vm.set_scope(VmSourceType::Sam); + let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Sam)); let expr = util::merge_expr(expr); let writer = Box::new(writer); vm.set_writer(writer); @@ -67,7 +67,7 @@ pub fn filterx_sam(cmd: SamCommand) -> FilterxResult<()> { if vm.status.printed { return Ok(()); } - let mut df = vm.source.into_df()?; + let mut df = vm.into_df()?; if output.is_none() && table.unwrap_or(false) { println!("{}", df); return Ok(()); diff --git a/src/filterx/src/files/vcf.rs b/src/filterx/src/files/vcf.rs index eec3d31..fa85ec5 100644 --- a/src/filterx/src/files/vcf.rs +++ b/src/filterx/src/files/vcf.rs @@ -1,9 +1,10 @@ +use filterx_source::Source; use polars::prelude::DataType; use polars::prelude::SchemaRef; use crate::args::{ShareArgs, VcfCommand}; -use filterx_engine::vm::{Vm, VmSourceType}; -use filterx_source::Source; +use filterx_engine::vm::Vm; +use filterx_source::{DataframeSource, SourceType}; use filterx_core::{util, FilterxResult}; @@ -80,10 +81,9 @@ pub fn filterx_vcf(cmd: VcfCommand) -> FilterxResult<()> { Some(vec!["."]), true, )?; - let mut s = Source::new(lazy_df.clone()); + let mut s = DataframeSource::new(lazy_df.clone()); s.set_init_column_names(&names); - let mut vm = Vm::from_dataframe(s); - vm.set_scope(VmSourceType::Vcf); + let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Vcf)); let expr = util::merge_expr(expr); let writer = Box::new(writer); vm.set_writer(writer); @@ -92,7 +92,7 @@ pub fn filterx_vcf(cmd: VcfCommand) -> FilterxResult<()> { if vm.status.printed { return Ok(()); } - let mut df = vm.source.into_df()?; + let mut df = vm.into_df()?; if output.is_none() && table.unwrap_or(false) { println!("{}", df); return Ok(()); diff --git a/src/filterx_engine/src/eval/assign.rs b/src/filterx_engine/src/eval/assign.rs index 46c0ea7..9f7fdf3 100644 --- a/src/filterx_engine/src/eval/assign.rs +++ b/src/filterx_engine/src/eval/assign.rs @@ -82,14 +82,14 @@ impl<'a> Eval<'a> for ast::StmtAssign { BinOp ); let exist = vm - .source + .source_mut() .ret_column_names .contains(&new_col_name.to_string()); let if_append = match exist { true => None, false => Some(new_col_name.into()), }; - vm.source + vm.source_mut() .with_column(value.expr()?.alias(new_col_name), if_append); Ok(Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/column/cast.rs b/src/filterx_engine/src/eval/call/builtin/column/cast.rs index d6d7f30..72e7beb 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/cast.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/cast.rs @@ -15,7 +15,7 @@ pub fn cast<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let new_type = match new_type.to_lowercase().as_str() { "int" => DataType::Int32, "float" => DataType::Float32, @@ -42,7 +42,7 @@ pub fn cast<'a>( let e = e.cast(new_type); if inplace { - vm.source.with_column(e.alias(name), None); + vm.source_mut().with_column(e.alias(name), None); return Ok(value::Value::None); } diff --git a/src/filterx_engine/src/eval/call/builtin/column/col.rs b/src/filterx_engine/src/eval/call/builtin/column/col.rs index 8923291..37f86a1 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/col.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/col.rs @@ -8,7 +8,7 @@ pub fn col(vm: &mut Vm, args: &Vec) -> FilterxResult { "col only support column index, column name, or function which return a column name." ); let c = match col_value { - value::Value::Int(i) => vm.source.index2column(i as usize), + value::Value::Int(i) => vm.source_mut().index2column(i as usize), value::Value::Str(s) => s, value::Value::Name(c) => c.name, value::Value::Item(c) => c.col_name, @@ -17,7 +17,7 @@ pub fn col(vm: &mut Vm, args: &Vec) -> FilterxResult { h.white("col only support column index, column name, or function which return a column name.").print_and_exit(); } }; - vm.source.has_column(&c); + vm.source_mut().has_column(&c); Ok(value::Value::Item(value::Item { col_name: c, data_type: None, diff --git a/src/filterx_engine/src/eval/call/builtin/column/drop.rs b/src/filterx_engine/src/eval/call/builtin/column/drop.rs index 2ce819a..477552a 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/drop.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/drop.rs @@ -6,10 +6,10 @@ pub fn drop<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult( "len: expected a column name as first argument" ); let name = col_name.column()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let e = col(name).drop_nulls(); if inplace { - vm.source.with_column(e.alias(name), None); + vm.source_mut().with_column(e.alias(name), None); return Ok(value::Value::None); } Ok(value::Value::named_expr(None, e)) diff --git a/src/filterx_engine/src/eval/call/builtin/column/dup.rs b/src/filterx_engine/src/eval/call/builtin/column/dup.rs index cb69412..4d61634 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/dup.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/dup.rs @@ -12,7 +12,7 @@ pub fn dup<'a>( for arg in args { let col = eval_col!(vm, arg, "dup only support column name"); let col = col.column()?; - vm.source.has_column(col); + vm.source_mut().has_column(col); select_dolumns.push(col.to_string()); } @@ -23,7 +23,7 @@ pub fn dup<'a>( .print_and_exit(); } - vm.source.unique(select_dolumns, unique_strategy); + vm.source_mut().unique(select_dolumns, unique_strategy); Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/column/fill.rs b/src/filterx_engine/src/eval/call/builtin/column/fill.rs index e07e92a..00c4237 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/fill.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/fill.rs @@ -1,6 +1,6 @@ use super::super::*; -use polars::prelude::Literal; use filterx_core::value::Value; +use polars::prelude::Literal; pub fn fill<'a>( vm: &'a mut Vm, @@ -22,10 +22,10 @@ pub fn fill<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let e = e.fill_null(const_value.lit()); if inplace { - let lazy = &mut vm.source; + let lazy = vm.source_mut(); lazy.with_column(e.alias(name), None); return Ok(Value::None); } diff --git a/src/filterx_engine/src/eval/call/builtin/column/header.rs b/src/filterx_engine/src/eval/call/builtin/column/header.rs index 60407f5..77bf840 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/header.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/header.rs @@ -1,7 +1,7 @@ use super::super::*; pub fn header(vm: &mut Vm) -> FilterxResult { - let source = &vm.source; + let source = &vm.source_mut(); let schema = source.columns()?; println!("index\tname\ttype"); for (index, (name, t)) in schema.iter().enumerate() { diff --git a/src/filterx_engine/src/eval/call/builtin/column/is_na.rs b/src/filterx_engine/src/eval/call/builtin/column/is_na.rs index a89fd66..a1ce6d0 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/is_na.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/is_na.rs @@ -7,12 +7,12 @@ pub fn is_na<'a>(vm: &'a mut Vm, args: &Vec, not: bool) -> FilterxRes &args[0], "is_na: expected a column name as first argument" ); - vm.source.has_column(col_name.column()?); + vm.source_mut().has_column(col_name.column()?); let col_expr = col_name.expr()?; if not { - vm.source.filter(col_expr.is_not_nan()); + vm.source_mut().filter(col_expr.is_not_nan()); } else { - vm.source.filter(col_expr.is_nan()); + vm.source_mut().filter(col_expr.is_nan()); } Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/column/is_null.rs b/src/filterx_engine/src/eval/call/builtin/column/is_null.rs index b04514b..5443464 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/is_null.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/is_null.rs @@ -7,12 +7,12 @@ pub fn is_null<'a>( ) -> FilterxResult { expect_args_len(args, 1)?; let col_name = eval_col!(vm, &args[0], "is_null: expected a column name as first argument"); - vm.source.has_column(col_name.column()?); + vm.source_mut().has_column(col_name.column()?); let col_expr = col_name.expr()?; if not { - vm.source.filter(col_expr.is_not_null()); + vm.source_mut().filter(col_expr.is_not_null()); } else { - vm.source.filter(col_expr.is_null()); + vm.source_mut().filter(col_expr.is_null()); } Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/column/print.rs b/src/filterx_engine/src/eval/call/builtin/column/print.rs index 06a0af5..5ffcc2c 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/print.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/print.rs @@ -5,8 +5,8 @@ use polars::{ }; use super::super::*; +use filterx_source::Source; use polars::prelude::{col, Expr}; - use regex::Regex; use lazy_static::lazy_static; @@ -17,6 +17,7 @@ lazy_static! { } fn parse_format_string( + source_type: SourceType, valid_names: Option<&Vec>, s: &str, ) -> FilterxResult<(String, Option>)> { @@ -37,9 +38,10 @@ fn parse_format_string( let re = ®EX_PATTERN; let fmt = re.replace_all(s, "{}").to_string(); let mut cols = Vec::new(); - let mut vm = Vm::from_dataframe(Source::new(DataFrame::empty().lazy())); + let source = DataframeSource::new(DataFrame::empty().lazy()); + let mut vm = Vm::from_source(Source::new(source.into(), source_type)); if let Some(valid_names) = valid_names { - vm.source.set_init_column_names(valid_names); + vm.source_mut().set_init_column_names(valid_names); } for cap in re.captures_iter(s) { let item = cap.get(1).unwrap().as_str(); @@ -50,7 +52,7 @@ fn parse_format_string( } if REGEX_VARNAME.is_match(item) { // recheck columns name - vm.source.has_column(item); + vm.source_mut().has_column(item); cols.push(col(item)); continue; } @@ -73,7 +75,7 @@ fn test_parse_format_string() { use polars::prelude::col; let s = "xxx_{seq}"; - let (fmt, cols) = parse_format_string(None, s).unwrap(); + let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); assert_eq!(fmt, "xxx_{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -81,7 +83,7 @@ fn test_parse_format_string() { assert_eq!(cols[0], col("seq")); let s = "xxx_{seq}_{seq}"; - let (fmt, cols) = parse_format_string(None, s).unwrap(); + let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); assert_eq!(fmt, "xxx_{}_{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -90,12 +92,12 @@ fn test_parse_format_string() { assert_eq!(cols[1], col("seq")); let s = "xxx"; - let (fmt, cols) = parse_format_string(None, s).unwrap(); + let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); assert_eq!(fmt, "xxx"); assert!(cols.is_none()); let s = "xxx{len(seq)}"; - let (fmt, cols) = parse_format_string(None, s).unwrap(); + let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); assert_eq!(fmt, "xxx{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -118,22 +120,22 @@ pub fn print<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult) -> FilterxResult "rename: expected a column name as first argument" ); let old_col = old_col.column()?; - vm.source.has_column(old_col); + vm.source_mut().has_column(old_col); let new_col_value = eval_col!( vm, &args[1], "rename: expected a column name as second argument" ); if vm - .source + .source_mut() .ret_column_names .contains(&new_col_value.column()?.to_string()) { return Ok(value::Value::None); } let new_col = new_col_value.column()?; - vm.source.rename([old_col], [new_col]); + vm.source_mut().rename([old_col], [new_col]); Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/column/select.rs b/src/filterx_engine/src/eval/call/builtin/column/select.rs index d7f113a..5ba06e9 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/select.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/select.rs @@ -8,7 +8,7 @@ pub fn select<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult(vm: &'a mut Vm, args: &Vec) -> FilterxResult, incr: bool) -> FilterxResult, incr: bool) -> FilterxResult, len: usize) -> FilterxResult<()> { if args.len() != len { diff --git a/src/filterx_engine/src/eval/call/builtin/number/abs.rs b/src/filterx_engine/src/eval/call/builtin/number/abs.rs index f19762e..93044b5 100644 --- a/src/filterx_engine/src/eval/call/builtin/number/abs.rs +++ b/src/filterx_engine/src/eval/call/builtin/number/abs.rs @@ -13,10 +13,10 @@ pub fn abs<'a>( "abs: expected a column name as first argument" ); let col_name = col_name.column()?; - vm.source.has_column(col_name); + vm.source_mut().has_column(col_name); let e = col(col_name).abs(); if inplace { - vm.source.with_column(e.alias(col_name), None); + vm.source_mut().with_column(e.alias(col_name), None); return Ok(value::Value::None); } Ok(value::Value::named_expr(None, e)) diff --git a/src/filterx_engine/src/eval/call/builtin/row/head.rs b/src/filterx_engine/src/eval/call/builtin/row/head.rs index 892dfd2..7925248 100644 --- a/src/filterx_engine/src/eval/call/builtin/row/head.rs +++ b/src/filterx_engine/src/eval/call/builtin/row/head.rs @@ -25,7 +25,7 @@ pub fn head(vm: &mut Vm, args: &Vec) -> FilterxResult { } }; - vm.source.slice(0, nrow); + vm.source_mut().slice(0, nrow); vm.status.limit_rows = nrow; Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/row/limit.rs b/src/filterx_engine/src/eval/call/builtin/row/limit.rs index 59ae8e4..e68f3e5 100644 --- a/src/filterx_engine/src/eval/call/builtin/row/limit.rs +++ b/src/filterx_engine/src/eval/call/builtin/row/limit.rs @@ -1,5 +1,3 @@ -use crate::vm::VmSourceType; - use super::super::*; pub fn limit<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { @@ -31,15 +29,15 @@ pub fn limit<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { + match vm.source_type() { + SourceType::Fasta | SourceType::Fastq => { vm.status.limit_rows = nrow; return Ok(value::Value::None); } _ => {} } - vm.source.slice(0, nrow); + vm.source_mut().slice(0, nrow); Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/row/tail.rs b/src/filterx_engine/src/eval/call/builtin/row/tail.rs index 39c9478..ebe68de 100644 --- a/src/filterx_engine/src/eval/call/builtin/row/tail.rs +++ b/src/filterx_engine/src/eval/call/builtin/row/tail.rs @@ -28,7 +28,7 @@ pub fn tail(vm: &mut Vm, args: &Vec) -> FilterxResult { } }; - let lazy = vm.source.lazy().tail(nrow as u32); - vm.source.update(lazy); + let lazy = vm.source_mut().lazy().tail(nrow as u32); + vm.source_mut().update(lazy); Ok(value::Value::None) } diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/gc.rs b/src/filterx_engine/src/eval/call/builtin/sequence/gc.rs index 1b71148..d42d9bc 100644 --- a/src/filterx_engine/src/eval/call/builtin/sequence/gc.rs +++ b/src/filterx_engine/src/eval/call/builtin/sequence/gc.rs @@ -54,7 +54,7 @@ pub fn gc<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let e = e.map(compute_revcomp, GetOutput::same_type()); if inplace { - vm.source.with_column(e.clone().alias(name), None); + vm.source_mut().with_column(e.clone().alias(name), None); return Ok(value::Value::None); } return Ok(value::Value::named_expr(Some(name.to_string()), e)); diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/to_fasta.rs b/src/filterx_engine/src/eval/call/builtin/sequence/to_fasta.rs index d05cb0b..40d960f 100644 --- a/src/filterx_engine/src/eval/call/builtin/sequence/to_fasta.rs +++ b/src/filterx_engine/src/eval/call/builtin/sequence/to_fasta.rs @@ -1,11 +1,10 @@ use super::super::*; -use crate::vm::VmSourceType; use std::io::Write; fn print_fasta(vm: &mut Vm) -> FilterxResult { vm.status.printed = true; - let name_index = vm.source.ret_column_names.iter().position(|x| x == "name"); - let seq_index = vm.source.ret_column_names.iter().position(|x| x == "seq"); + let name_index = vm.source_mut().ret_column_names.iter().position(|x| x == "name"); + let seq_index = vm.source_mut().ret_column_names.iter().position(|x| x == "seq"); if name_index.is_none() { let h = &mut vm.hint; @@ -24,7 +23,7 @@ fn print_fasta(vm: &mut Vm) -> FilterxResult { } let name_index = name_index.unwrap(); let seq_index = seq_index.unwrap(); - let df = vm.source.lazy().collect()?; + let df = vm.source_mut().lazy().collect()?; let columns = df.get_columns(); let name_col = &columns[name_index]; let seq_col = &columns[seq_index]; @@ -53,7 +52,7 @@ fn print_fasta(vm: &mut Vm) -> FilterxResult { } pub fn to_fasta(vm: &mut Vm) -> FilterxResult { - if vm.source_type == VmSourceType::Fasta || vm.source_type == VmSourceType::Fastq { + if vm.source_type() == SourceType::Fasta || vm.source_type() == SourceType::Fastq { return print_fasta(vm); } let h = &mut vm.hint; diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/to_fastq.rs b/src/filterx_engine/src/eval/call/builtin/sequence/to_fastq.rs index 1d79b0d..c477fac 100644 --- a/src/filterx_engine/src/eval/call/builtin/sequence/to_fastq.rs +++ b/src/filterx_engine/src/eval/call/builtin/sequence/to_fastq.rs @@ -3,9 +3,9 @@ use std::io::Write; fn print_fastq(vm: &mut Vm) -> FilterxResult { vm.status.printed = true; - let name_index = vm.source.ret_column_names.iter().position(|x| x == "name"); - let seq_index = vm.source.ret_column_names.iter().position(|x| x == "seq"); - let qual_index = vm.source.ret_column_names.iter().position(|x| x == "qual"); + let name_index = vm.source_mut().ret_column_names.iter().position(|x| x == "name"); + let seq_index = vm.source_mut().ret_column_names.iter().position(|x| x == "seq"); + let qual_index = vm.source_mut().ret_column_names.iter().position(|x| x == "qual"); if name_index.is_none() { let h = &mut vm.hint; h.white("Lost ") @@ -22,7 +22,7 @@ fn print_fastq(vm: &mut Vm) -> FilterxResult { } let name_index = name_index.unwrap(); let seq_index = seq_index.unwrap(); - let df = vm.source.lazy().collect()?; + let df = vm.source_mut().lazy().collect()?; let columns = df.get_columns(); let name_col = &columns[name_index]; let seq_col = &columns[seq_index]; @@ -82,7 +82,7 @@ fn print_fastq(vm: &mut Vm) -> FilterxResult { } pub fn to_fastq(vm: &mut Vm) -> FilterxResult { - if vm.source_type == VmSourceType::Fasta || vm.source_type == VmSourceType::Fastq { + if vm.source_type() == SourceType::Fasta || vm.source_type() == SourceType::Fastq { return print_fastq(vm); } let h = &mut vm.hint; diff --git a/src/filterx_engine/src/eval/call/builtin/string/len.rs b/src/filterx_engine/src/eval/call/builtin/string/len.rs index 280265c..0b0db24 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/len.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/len.rs @@ -1,4 +1,5 @@ use super::super::*; +use filterx_source::source::SourceType; pub fn len<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { expect_args_len(args, 1)?; @@ -9,12 +10,15 @@ pub fn len<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { + e = e.str().len_chars(); + } + _ => { + e = e.str().len_bytes(); + } } Ok(value::Value::named_expr(None, e)) } diff --git a/src/filterx_engine/src/eval/call/builtin/string/lower.rs b/src/filterx_engine/src/eval/call/builtin/string/lower.rs index 4dd85ed..99ae91d 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/lower.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/lower.rs @@ -15,9 +15,9 @@ pub fn lower<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); if inplace { - vm.source + vm.source_mut() .with_column(e.str().to_lowercase().alias(name), None); return Ok(value::Value::None); } diff --git a/src/filterx_engine/src/eval/call/builtin/string/replace.rs b/src/filterx_engine/src/eval/call/builtin/string/replace.rs index 787fd20..8a480ba 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/replace.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/replace.rs @@ -17,7 +17,7 @@ pub fn replace<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let patt = eval!( vm, @@ -38,7 +38,7 @@ pub fn replace<'a>( let repl = lit(repl.as_str()); if inplace { - vm.source.with_column( + vm.source_mut().with_column( match many { true => e.str().replace_all(patt, repl, true).alias(name), false => e.str().replace(patt, repl, true).alias(name), diff --git a/src/filterx_engine/src/eval/call/builtin/string/rev.rs b/src/filterx_engine/src/eval/call/builtin/string/rev.rs index 227d796..f59f8a1 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/rev.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/rev.rs @@ -14,10 +14,10 @@ pub fn rev<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let e = e.str().reverse(); if inplace { - vm.source.with_column(e.clone().alias(name), None); + vm.source_mut().with_column(e.clone().alias(name), None); return Ok(value::Value::None); } return Ok(value::Value::named_expr(Some(name.to_string()), e)); diff --git a/src/filterx_engine/src/eval/call/builtin/string/slice.rs b/src/filterx_engine/src/eval/call/builtin/string/slice.rs index 9a3baba..6f107aa 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/slice.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/slice.rs @@ -41,7 +41,7 @@ pub fn slice<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); let length; let mut start = 0; if args.len() == 2 { @@ -54,7 +54,7 @@ pub fn slice<'a>( let e = e.str().slice(lit(start), lit(length)); if inplace { - vm.source.with_column(e.alias(name), None); + vm.source_mut().with_column(e.alias(name), None); return Ok(value::Value::None); } diff --git a/src/filterx_engine/src/eval/call/builtin/string/strip.rs b/src/filterx_engine/src/eval/call/builtin/string/strip.rs index 491e24e..3498c89 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/strip.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/strip.rs @@ -19,7 +19,7 @@ pub fn strip<'a>( let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(&name); + vm.source_mut().has_column(&name); let patt = eval!( vm, @@ -38,7 +38,7 @@ pub fn strip<'a>( }; if inplace { - vm.source.with_column(e.alias(name), None); + vm.source_mut().with_column(e.alias(name), None); return Ok(value::Value::None); } diff --git a/src/filterx_engine/src/eval/call/builtin/string/upper.rs b/src/filterx_engine/src/eval/call/builtin/string/upper.rs index 47fba8f..c1112d7 100644 --- a/src/filterx_engine/src/eval/call/builtin/string/upper.rs +++ b/src/filterx_engine/src/eval/call/builtin/string/upper.rs @@ -15,9 +15,9 @@ pub fn upper<'a>( ); let name = col_name.column()?; let e = col_name.expr()?; - vm.source.has_column(name); + vm.source_mut().has_column(name); if inplace { - vm.source + vm.source_mut() .with_column(e.str().to_uppercase().alias(name), None); return Ok(value::Value::None); } diff --git a/src/filterx_engine/src/eval/call/call.rs b/src/filterx_engine/src/eval/call/call.rs index e069a3e..c332c74 100644 --- a/src/filterx_engine/src/eval/call/call.rs +++ b/src/filterx_engine/src/eval/call/call.rs @@ -5,8 +5,8 @@ use polars::frame::UniqueKeepStrategy; use super::super::ast; use crate::vm::Vm; -use crate::vm::VmSourceType; use filterx_core::{value, FilterxResult}; +use filterx_source::source::SourceType; use crate::eval::call::builtin as call; use crate::eval::Eval; @@ -49,10 +49,11 @@ impl<'a> Eval<'a> for ast::ExprCall { "select" => call::select(vm, &self.args), "col" | "c" => call::col(vm, &self.args), "rename" => { - if vm.source_type == VmSourceType::Fasta || vm.source_type == VmSourceType::Fastq { + if vm.source_type() == SourceType::Fasta || vm.source_type() == SourceType::Fastq { + let source_type = vm.source_type(); let h = &mut vm.hint; h.white("Function `rename` does not be supported in source `") - .cyan(&format!("{:?}", vm.source_type)) + .cyan(&format!("{:?}", source_type)) .white("`.") .print_and_exit(); } else { diff --git a/src/filterx_engine/src/eval/ops.rs b/src/filterx_engine/src/eval/ops.rs index fdcfc11..ab018cc 100644 --- a/src/filterx_engine/src/eval/ops.rs +++ b/src/filterx_engine/src/eval/ops.rs @@ -333,12 +333,12 @@ fn boolop_in_dataframe<'a>( match op { ast::BoolOp::And => match (l, r) { (_, _) => { - vm.source.filter(l.expr()?.and(r.clone().expr()?)); + vm.source_mut().filter(l.expr()?.and(r.clone().expr()?)); } }, ast::BoolOp::Or => match (l, r) { (_, _) => { - vm.source.filter(l.expr()?.or(r.expr()?)); + vm.source_mut().filter(l.expr()?.or(r.expr()?)); } }, } @@ -459,7 +459,7 @@ fn str_in_col<'a>(vm: &'a mut Vm, left: Value, right: Value, op: &CmpOp) -> Filt .print_and_exit(); } }; - vm.source.filter(e); + vm.source_mut().filter(e); Ok(Value::None) } @@ -486,7 +486,7 @@ fn compare_in_and_not_in_dataframe<'a>( .print_and_exit(); } }; - let df_root = vm.source.lazy(); + let df_root = vm.source_mut().lazy(); let left_df = df_root.collect()?; let left_col_type = left_df.column(&left.to_string())?.dtype(); let right_col = match &right { @@ -563,11 +563,11 @@ fn compare_in_and_not_in_dataframe<'a>( match op { CmpOp::In => { let df = left_df.join(&right_df, left_on, right_on, JoinArgs::new(JoinType::Semi))?; - vm.source.update(df.lazy()); + vm.source_mut().update(df.lazy()); } CmpOp::NotIn => { let df = left_df.join(&right_df, left_on, right_on, JoinArgs::new(JoinType::Anti))?; - vm.source.update(df.lazy()); + vm.source_mut().update(df.lazy()); } _ => unreachable!(), } @@ -584,7 +584,7 @@ fn compare_cond_expr_in_dataframe<'a>( let left_expr = left.expr()?; let right_expr = right.expr()?; let e = cond_expr_build(vm, left_expr, right_expr, op.clone())?; - vm.source.filter(e); + vm.source_mut().filter(e); Ok(Value::None) } diff --git a/src/filterx_engine/src/vm.rs b/src/filterx_engine/src/vm.rs index f5cb1de..2a333cb 100644 --- a/src/filterx_engine/src/vm.rs +++ b/src/filterx_engine/src/vm.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; -use filterx_core::{FilterxError, FilterxResult, Hint}; +use polars::prelude::*; -use filterx_source::Source; +use filterx_core::{FilterxError, FilterxResult, Hint}; +use filterx_source::source::SourceType; +use filterx_source::{DataframeSource, Source, SourceInner}; use super::eval::Eval; @@ -23,6 +25,7 @@ pub struct VmStatus { pub offset: usize, pub printed: bool, pub consume_rows: usize, + pub chunk_size: usize, } impl VmStatus { @@ -35,6 +38,7 @@ impl VmStatus { offset: 0, printed: false, consume_rows: 0, + chunk_size: 10000, } } } @@ -43,48 +47,18 @@ impl VmStatus { pub fn update_apply_lazy(&mut self, apply_lazy: bool) { self.apply_lazy = apply_lazy; } -} -impl Default for VmStatus { - fn default() -> Self { - Self::new() + pub fn set_limit_rows(&mut self, limit_rows: usize) { + self.limit_rows = limit_rows; } -} - -#[derive(Debug, Clone, Copy, clap::ValueEnum, PartialEq)] -pub enum VmSourceType { - Csv, - Fasta, - Fastq, - Vcf, - Sam, - Gxf, -} - -impl Into<&str> for VmSourceType { - fn into(self) -> &'static str { - match self { - VmSourceType::Csv => "csv", - VmSourceType::Fasta => "fasta", - VmSourceType::Fastq => "fastq", - VmSourceType::Vcf => "vcf", - VmSourceType::Sam => "sam", - VmSourceType::Gxf => "gxf", - } + pub fn set_chunk_size(&mut self, chunk_size: usize) { + self.chunk_size = chunk_size; } } -impl From<&str> for VmSourceType { - fn from(s: &str) -> Self { - match s { - "csv" => VmSourceType::Csv, - "fasta" => VmSourceType::Fasta, - "fastq" => VmSourceType::Fastq, - "vcf" => VmSourceType::Vcf, - "sam" => VmSourceType::Sam, - "gxf" => VmSourceType::Gxf, - _ => panic!("Invalid source type"), - } +impl Default for VmStatus { + fn default() -> Self { + Self::new() } } @@ -97,21 +71,19 @@ pub struct Vm { /// source pub source: Source, pub status: VmStatus, - pub source_type: VmSourceType, pub writer: Option>>>, pub expr_cache: HashMap)>, pub hint: Hint, } impl Vm { - pub fn from_dataframe(dataframe: Source) -> Self { + pub fn from_source(source: Source) -> Self { Self { eval_expr: String::new(), parse_cache: HashMap::new(), mode: VmMode::Expression, - source: dataframe, + source, status: VmStatus::default(), - source_type: VmSourceType::Csv, writer: None, expr_cache: HashMap::new(), hint: Hint::new(), @@ -122,10 +94,6 @@ impl Vm { self.mode = mode; } - pub fn set_scope(&mut self, scope: VmSourceType) { - self.source_type = scope; - } - pub fn set_writer(&mut self, writer: Box>>) { self.writer = Some(writer); } @@ -231,13 +199,51 @@ impl Vm { Ok(()) } - pub fn next_batch(&mut self) -> FilterxResult<()> { + pub fn next_batch(&mut self) -> FilterxResult> { self.status.printed = false; - Ok(()) + match self.source_type() { + SourceType::Fasta | SourceType::Fastq => { + let left = self.status.limit_rows - self.status.consume_rows; + let left = left.min(self.status.chunk_size); + if left > 0 { + match self.source.inner { + SourceInner::Fasta(ref mut fasta) => { + fasta.into_dataframe(left)?; + } + SourceInner::Fastq(ref mut fastq) => { + fastq.into_dataframe(left)?; + } + _ => { + unreachable!(); + } + } + return Ok(Some(())); + } + Ok(None) + } + _ => Ok(Some(())), + } + } + + pub fn source_mut(&mut self) -> &mut DataframeSource { + self.source.df_source_mut() + } + + pub fn source(&self) -> &DataframeSource { + self.source.df_source() + } + + pub fn into_df(&self) -> FilterxResult { + self.source.into_df() + } + + pub fn source_type(&self) -> SourceType { + self.source.source_type } pub fn finish(&mut self) -> FilterxResult<()> { - self.source.finish() + let s = self.source.df_source_mut(); + s.finish() } } @@ -245,11 +251,14 @@ impl Vm { mod test { use super::*; use filterx_core::util; + use filterx_source::source::{SourceInner, SourceType}; + use filterx_source::DataframeSource; #[test] fn test_vm() { - let frame = Source::new(util::mock_lazy_df()); - let mut vm = Vm::from_dataframe(frame); + let frame = DataframeSource::new(util::mock_lazy_df()); + let source = Source::new(frame.into(), SourceType::Csv); + let mut vm = Vm::from_source(source); let expr = "alias(c) = a + b"; let ret = vm.eval_once(expr); println!("{:?}", ret); diff --git a/src/filterx_source/src/block/fastx/fasta.rs b/src/filterx_source/src/block/fastx/fasta.rs index 08d2599..1c810ea 100644 --- a/src/filterx_source/src/block/fastx/fasta.rs +++ b/src/filterx_source/src/block/fastx/fasta.rs @@ -3,6 +3,7 @@ use std::io::BufRead; use super::FastaRecordType; use crate::block::reader::TableLikeReader; +use crate::dataframe::DataframeSource; use filterx_core::{FilterxResult, Hint}; @@ -22,6 +23,7 @@ impl Default for FastaParserOptions { pub struct FastaSource { pub fasta: Fasta, pub records: Vec, + pub dataframe: DataframeSource, } impl Drop for FastaSource { @@ -38,10 +40,15 @@ impl FastaSource { let opt = FastaParserOptions { include_comment }; let fasta = fasta.set_parser_options(opt); let records = vec![FastaRecord::default(); 4096]; - Ok(FastaSource { fasta, records }) + let dataframe = DataframeSource::new(DataFrame::empty().lazy()); + Ok(FastaSource { + fasta, + records, + dataframe, + }) } - pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { + pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { let records = &mut self.records; if records.capacity() < n { unsafe { @@ -75,7 +82,8 @@ impl FastaSource { } let df = Fasta::as_dataframe(&records, &self.fasta.parser_options)?; - Ok(Some(df)) + self.dataframe.update(df.lazy()); + Ok(Some(())) } pub fn reset(&mut self) -> FilterxResult<()> { diff --git a/src/filterx_source/src/block/fastx/fastq.rs b/src/filterx_source/src/block/fastx/fastq.rs index df1a768..90484db 100644 --- a/src/filterx_source/src/block/fastx/fastq.rs +++ b/src/filterx_source/src/block/fastx/fastq.rs @@ -2,12 +2,14 @@ use polars::prelude::*; use std::io::BufRead; use crate::block::reader::TableLikeReader; +use crate::dataframe::DataframeSource; use filterx_core::{FilterxError, FilterxResult, Hint}; pub struct FastqSource { pub fastq: Fastq, pub records: Vec, + pub dataframe: DataframeSource, } impl Drop for FastqSource { @@ -26,10 +28,15 @@ impl FastqSource { }; let fastq = Fastq::from_path(path)?.set_parser_options(parser_option); let records = vec![FastqRecord::default(); 4096]; - Ok(FastqSource { fastq, records }) + let dataframe = DataframeSource::new(DataFrame::empty().lazy()); + Ok(FastqSource { + fastq, + records, + dataframe, + }) } - pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { + pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { let records = &mut self.records; if records.capacity() < n { @@ -64,7 +71,8 @@ impl FastqSource { Ok(None) } else { let df = Fastq::as_dataframe(&records, &self.fastq.parser_option)?; - Ok(Some(df)) + self.dataframe.update(df.lazy()); + Ok(Some(())) } } @@ -88,6 +96,19 @@ impl Default for FastqParserOption { } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum QulityType { + Phred33, + Phred64, + Unknown, +} + +impl Default for QulityType { + fn default() -> Self { + QulityType::Unknown + } +} + pub struct Fastq { reader: TableLikeReader, read_end: bool, @@ -96,6 +117,7 @@ pub struct Fastq { record: FastqRecord, pub line_buffer: Vec, break_line_len: Option, + pub quality_type: QulityType, } #[derive(Debug, Clone)] @@ -232,7 +254,7 @@ impl IntoIterator for Fastq { impl Fastq { pub fn from_path(path: &str) -> FilterxResult { - Ok(Fastq { + let mut fq = Fastq { reader: TableLikeReader::new(path)?, read_end: false, parser_option: FastqParserOption::default(), @@ -240,7 +262,53 @@ impl Fastq { record: FastqRecord::default(), line_buffer: Vec::with_capacity(512), break_line_len: None, - }) + quality_type: QulityType::default(), + }; + fq.guess_quality_type()?; + Ok(fq) + } + + fn guess_quality_type(&mut self) -> FilterxResult<()> { + if !self.parser_option.include_qual { + return Ok(()); + } + let mut qualitys: [QulityType; 100] = [QulityType::Unknown; 100]; + let mut count = 0; + for _ in 0..100 { + let record = self.parse_next()?; + if let Some(record) = record { + let qual = record.qual(); + if let Some(qual) = qual { + let qual_u8 = qual.as_bytes(); + let max = qual_u8.iter().max().unwrap(); + let min = qual_u8.iter().min().unwrap(); + let new_quality_type = if *max >= 64 && *min >= 33 { + QulityType::Phred64 + } else if *max <= 64 && *min <= 33 { + QulityType::Phred33 + } else { + QulityType::Unknown + }; + qualitys[count] = new_quality_type; + count += 1; + } + } + } + let t = if count == 0 { + QulityType::Unknown + } else { + let mut t = qualitys[0]; + for i in 1..count { + if qualitys[i] != t { + t = QulityType::Unknown; + break; + } + } + t + }; + self.quality_type = t; + self.reset()?; + Ok(()) } /// parse fastq format based paper: https://academic.oup.com/nar/article/38/6/1767/3112533 diff --git a/src/filterx_source/src/dataframe.rs b/src/filterx_source/src/dataframe.rs index f76ff89..725d9e5 100644 --- a/src/filterx_source/src/dataframe.rs +++ b/src/filterx_source/src/dataframe.rs @@ -3,7 +3,7 @@ use polars::prelude::*; use filterx_core::{FilterxResult, Hint}; use regex::Regex; -pub struct Source { +pub struct DataframeSource { pub lazy: LazyFrame, pub has_header: bool, pub init_column_names: Vec, @@ -16,7 +16,7 @@ pub fn detect_columns(df: LazyFrame) -> FilterxResult> { Ok(schema.iter().map(|x| x.to_string()).collect()) } -impl Source { +impl DataframeSource { pub fn new(lazy: LazyFrame) -> Self { let lazy = lazy.with_streaming(true); Self { @@ -28,7 +28,7 @@ impl Source { } } -impl Source { +impl DataframeSource { pub fn reset(&mut self) { self.has_header = true; self.init_column_names.clear(); diff --git a/src/filterx_source/src/lib.rs b/src/filterx_source/src/lib.rs index ce34fc9..25413db 100644 --- a/src/filterx_source/src/lib.rs +++ b/src/filterx_source/src/lib.rs @@ -1,7 +1,9 @@ pub mod block; pub mod dataframe; +pub mod source; pub use block::fasta::FastaSource; pub use block::fastq::FastqSource; pub use dataframe::detect_columns; -pub use dataframe::Source; +pub use dataframe::DataframeSource; +pub use source::{Source, SourceInner, SourceType}; diff --git a/src/filterx_source/src/source.rs b/src/filterx_source/src/source.rs new file mode 100644 index 0000000..e29b0b6 --- /dev/null +++ b/src/filterx_source/src/source.rs @@ -0,0 +1,106 @@ +use crate::block::fasta::FastaSource; +use crate::block::fastq::FastqSource; +use crate::DataframeSource; + +use filterx_core::FilterxResult; +use polars::prelude::*; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum SourceType { + Csv, + Fasta, + Fastq, + Vcf, + Sam, + Gxf, +} + +impl Into<&str> for SourceType { + fn into(self) -> &'static str { + match self { + SourceType::Csv => "csv", + SourceType::Fasta => "fasta", + SourceType::Fastq => "fastq", + SourceType::Vcf => "vcf", + SourceType::Sam => "sam", + SourceType::Gxf => "gxf", + } + } +} + +impl From<&str> for SourceType { + fn from(s: &str) -> Self { + match s { + "csv" => SourceType::Csv, + "fasta" => SourceType::Fasta, + "fastq" => SourceType::Fastq, + "vcf" => SourceType::Vcf, + "sam" => SourceType::Sam, + "gxf" => SourceType::Gxf, + _ => panic!("Invalid source type"), + } + } +} + +pub enum SourceInner { + DataFrame(DataframeSource), + Fasta(FastaSource), + Fastq(FastqSource), +} + +impl From for SourceInner { + fn from(df: DataframeSource) -> Self { + SourceInner::DataFrame(df) + } +} + +impl From for SourceInner { + fn from(fasta: FastaSource) -> Self { + SourceInner::Fasta(fasta) + } +} + +impl From for SourceInner { + fn from(fastq: FastqSource) -> Self { + SourceInner::Fastq(fastq) + } +} + +pub struct Source { + pub source_type: SourceType, + pub inner: SourceInner, +} + +impl Source { + pub fn new(inner: SourceInner, source_type: SourceType) -> Self { + Source { source_type, inner } + } +} + +impl Source { + pub fn df_source_mut(&mut self) -> &mut DataframeSource { + match &mut self.inner { + SourceInner::DataFrame(df) => df, + SourceInner::Fasta(fasta) => &mut fasta.dataframe, + SourceInner::Fastq(fastq) => &mut fastq.dataframe, + } + } + + pub fn df_source(&self) -> &DataframeSource { + match &self.inner { + SourceInner::DataFrame(df) => df, + SourceInner::Fasta(fasta) => &fasta.dataframe, + SourceInner::Fastq(fastq) => &fastq.dataframe, + } + } + + pub fn into_df(&self) -> FilterxResult { + let s = match &self.inner { + SourceInner::DataFrame(df) => df, + SourceInner::Fasta(ref fasta) => &fasta.dataframe, + SourceInner::Fastq(ref fastq) => &fastq.dataframe, + }; + let df = s.lazy(); + Ok(df.collect()?) + } +}