From ce9cb99c1be39ca8913e4f6fd90645d4eb899ae9 Mon Sep 17 00:00:00 2001 From: dwpeng Date: Thu, 14 Nov 2024 22:32:53 +0800 Subject: [PATCH] feat: add memchr and clap dependencies, enhance filterx functions with new parameters and types --- Cargo.lock | 2 + Cargo.toml | 1 + src/filterx/src/args.rs | 19 ++- src/filterx/src/cli.rs | 6 +- src/filterx/src/files/fasta.rs | 9 +- src/filterx/src/files/fastq.rs | 10 +- src/filterx/src/files/gxf.rs | 20 ++- src/filterx_engine/src/engine_macro.rs | 15 ++ src/filterx_engine/src/eval/assign.rs | 3 +- .../src/eval/call/builtin/column/print.rs | 51 +++--- .../src/eval/call/builtin/mod.rs | 2 +- .../src/eval/call/builtin/sequence/gc.rs | 22 ++- .../src/eval/call/builtin/sequence/mod.rs | 4 +- .../src/eval/call/builtin/sequence/phred.rs | 15 ++ .../src/eval/call/builtin/sequence/qual.rs | 161 ++++++++++++++++++ .../src/eval/call/builtin/sequence/revcomp.rs | 68 +++++++- src/filterx_engine/src/eval/call/call.rs | 4 + src/filterx_engine/src/eval/ops.rs | 4 +- src/filterx_engine/src/vm.rs | 32 +++- src/filterx_source/Cargo.toml | 2 + src/filterx_source/src/block/fastx/fasta.rs | 97 +++++++++-- src/filterx_source/src/block/fastx/fastq.rs | 90 +++++++--- src/filterx_source/src/block/fastx/mod.rs | 8 - src/filterx_source/src/dataframe.rs | 1 + src/filterx_source/src/lib.rs | 4 +- src/filterx_source/src/source.rs | 72 +++++++- 26 files changed, 613 insertions(+), 109 deletions(-) create mode 100644 src/filterx_engine/src/eval/call/builtin/sequence/phred.rs create mode 100644 src/filterx_engine/src/eval/call/builtin/sequence/qual.rs diff --git a/Cargo.lock b/Cargo.lock index d7b96ae..82fbca2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -771,8 +771,10 @@ dependencies = [ name = "filterx_source" version = "0.2.8" dependencies = [ + "clap", "filterx_core", "flate2", + "memchr", "polars", "regex", ] diff --git a/Cargo.toml b/Cargo.toml index 6fef18f..c851911 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ flate2 = { version = "1.0.34", features = ["zlib-rs"] } regex = "1.11.1" colored = "2.1.0" lazy_static = "1.5.0" +memchr = "2.7.4" [profile.dev] diff --git a/src/filterx/src/args.rs b/src/filterx/src/args.rs index 62c779e..f2bb01d 100644 --- a/src/filterx/src/args.rs +++ b/src/filterx/src/args.rs @@ -1,4 +1,5 @@ use clap::{ArgAction, Args, Parser, Subcommand, ValueHint}; +use filterx_source::{FastaRecordType, QualityType}; static LONG_ABOUT: &'static str = include_str!("./long.txt"); @@ -55,7 +56,7 @@ pub struct ShareArgs { #[clap(short='o', long, value_hint=ValueHint::FilePath)] pub output: Option, - /// output as table format + /// output as table format, only output to stdout #[clap(short = 't', long, default_value = "false", action = ArgAction::SetTrue)] pub table: Option, } @@ -110,6 +111,14 @@ pub struct FastaCommand { /// limit sequence number, 0 means no limit #[clap(long, default_value = "0")] pub limit: Option, + + /// sequence type, default is DNA + #[clap(long, default_value = "auto")] + pub r#type: Option, + + /// detect sequence type by first N sequences + #[clap(long, default_value = "3")] + pub detect_size: Option, } #[derive(Debug, Clone, Parser)] @@ -132,6 +141,14 @@ pub struct FastqCommand { /// limit sequence number, 0 means no limit #[clap(long, default_value = "0")] pub limit: Option, + + /// quality type, phred33, phred64, auto, auto: will try to detect + #[clap(long, default_value = "auto")] + pub phred: Option, + + /// detect quality type by first N sequences + #[clap(long, default_value = "100")] + pub detect_size: Option, } #[derive(Debug, Clone, Parser)] diff --git a/src/filterx/src/cli.rs b/src/filterx/src/cli.rs index 7710270..db3a946 100644 --- a/src/filterx/src/cli.rs +++ b/src/filterx/src/cli.rs @@ -2,7 +2,7 @@ use crate::args::{Cli, Command}; use crate::files::csv::filterx_csv; use crate::files::fasta::filterx_fasta; use crate::files::fastq::filterx_fastq; -use crate::files::gxf::filterx_gxf; +use crate::files::gxf::{filterx_gxf, GxfType}; use crate::files::sam::filterx_sam; use crate::files::vcf::filterx_vcf; @@ -18,7 +18,7 @@ pub fn cli() -> FilterxResult<()> { Command::Fastq(cmd) => filterx_fastq(cmd), Command::Sam(cmd) => filterx_sam(cmd), Command::Vcf(cmd) => filterx_vcf(cmd), - Command::GFF(cmd) => filterx_gxf(cmd), - Command::GTF(cmd) => filterx_gxf(cmd), + Command::GFF(cmd) => filterx_gxf(cmd, GxfType::Gff), + Command::GTF(cmd) => filterx_gxf(cmd, GxfType::Gtf), } } diff --git a/src/filterx/src/files/fasta.rs b/src/filterx/src/files/fasta.rs index c90391d..57a88c1 100644 --- a/src/filterx/src/files/fasta.rs +++ b/src/filterx/src/files/fasta.rs @@ -17,7 +17,9 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> { }, chunk: long, no_comment, + r#type, limit, + detect_size, } = cmd; let _limit = match limit { @@ -39,7 +41,12 @@ pub fn filterx_fasta(cmd: FastaCommand) -> FilterxResult<()> { let names = names.iter().map(|x| x.to_string()).collect(); let expr = util::merge_expr(expr); - let mut source = FastaSource::new(path.as_str(), !no_comment.unwrap())?; + let mut source = FastaSource::new( + path.as_str(), + !no_comment.unwrap(), + r#type.unwrap(), + detect_size.unwrap(), + )?; let output = util::create_buffer_writer(output)?; let mut output = Box::new(output); if expr.is_empty() { diff --git a/src/filterx/src/files/fastq.rs b/src/filterx/src/files/fastq.rs index 57c5299..b347e3f 100644 --- a/src/filterx/src/files/fastq.rs +++ b/src/filterx/src/files/fastq.rs @@ -17,7 +17,9 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> { chunk: long, no_comment, no_quality, + phred, limit, + detect_size, } = cmd; let _limit = match limit { @@ -51,7 +53,13 @@ pub fn filterx_fastq(cmd: FastqCommand) -> FilterxResult<()> { let names = names.iter().map(|x| x.to_string()).collect::>(); let expr = util::merge_expr(expr); - let mut source = FastqSource::new(path.as_str(), !no_comment.unwrap(), !no_quality.unwrap())?; + let mut source = FastqSource::new( + path.as_str(), + !no_comment.unwrap(), + !no_quality.unwrap(), + phred.unwrap(), + detect_size.unwrap(), + )?; let output = util::create_buffer_writer(output)?; let mut output = Box::new(output); if expr.is_empty() { diff --git a/src/filterx/src/files/gxf.rs b/src/filterx/src/files/gxf.rs index 3fd09bc..9ac4e15 100644 --- a/src/filterx/src/files/gxf.rs +++ b/src/filterx/src/files/gxf.rs @@ -23,7 +23,22 @@ fn init_gxf_schema() -> Option { util::create_schemas(files) } -pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> { +#[derive(Debug, Clone, Copy, PartialEq, clap::ValueEnum)] +pub enum GxfType { + Gff, + Gtf, +} + +impl From for SourceType { + fn from(g: GxfType) -> Self { + match g { + GxfType::Gff => SourceType::Gff, + GxfType::Gtf => SourceType::Gtf, + } + } +} + +pub fn filterx_gxf(cmd: GFFCommand, gxf_type: GxfType) -> FilterxResult<()> { let GFFCommand { share_args: ShareArgs { @@ -33,7 +48,6 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> { table, }, } = cmd; - let comment_prefix = "#"; let separator = "\t"; let writer = util::create_buffer_writer(output.clone())?; @@ -55,7 +69,7 @@ pub fn filterx_gxf(cmd: GFFCommand) -> FilterxResult<()> { )?; let mut s = DataframeSource::new(lazy_df.clone()); s.set_init_column_names(&names); - let mut vm = Vm::from_source(Source::new(s.into(), SourceType::Gxf)); + let mut vm = Vm::from_source(Source::new(s.into(), gxf_type.into())); let expr = util::merge_expr(expr); let writer = Box::new(writer); vm.set_writer(writer); diff --git a/src/filterx_engine/src/engine_macro.rs b/src/filterx_engine/src/engine_macro.rs index 2b9441e..e7c7c9d 100644 --- a/src/filterx_engine/src/engine_macro.rs +++ b/src/filterx_engine/src/engine_macro.rs @@ -50,3 +50,18 @@ macro_rules! builtin_function { )* }; } + +#[macro_export] +macro_rules! execuable { + ($vm:expr, $target: literal) => { + use crate::vm::VmMode; + if $vm.mode == VmMode::Printable { + let h = &mut $vm.hint; + h.white("Con't use ") + .red($target) + .white(" in builtin function") + .green(" `print`.") + .print_and_exit() + } + }; +} diff --git a/src/filterx_engine/src/eval/assign.rs b/src/filterx_engine/src/eval/assign.rs index 9f7fdf3..c546f4c 100644 --- a/src/filterx_engine/src/eval/assign.rs +++ b/src/filterx_engine/src/eval/assign.rs @@ -8,11 +8,12 @@ use filterx_core::FilterxResult; use crate::eval::Eval; use crate::vm::Vm; -use crate::eval; +use crate::{eval, execuable}; impl<'a> Eval<'a> for ast::StmtAssign { type Output = Value; fn eval(&self, vm: &'a mut Vm) -> FilterxResult { + execuable!(vm, "="); if self.targets.len() != 1 { let h = &mut vm.hint; h.white("Dosn't support unpacking multiple assignment expression") diff --git a/src/filterx_engine/src/eval/call/builtin/column/print.rs b/src/filterx_engine/src/eval/call/builtin/column/print.rs index 5ffcc2c..0e81d50 100644 --- a/src/filterx_engine/src/eval/call/builtin/column/print.rs +++ b/src/filterx_engine/src/eval/call/builtin/column/print.rs @@ -1,26 +1,19 @@ -use polars::{ - frame::DataFrame, - io::SerWriter, - prelude::{format_str, IntoLazy}, -}; +use polars::{io::SerWriter, prelude::format_str}; + +use crate::vm::VmMode; use super::super::*; -use filterx_source::Source; use polars::prelude::{col, Expr}; use regex::Regex; use lazy_static::lazy_static; lazy_static! { - static ref REGEX_PATTERN: Regex = Regex::new(r"\{([\(\)a-zA-Z0-9_\-+*\\ ]*)\}").unwrap(); + static ref REGEX_PATTERN: Regex = Regex::new(r"\{([\(\)a-zA-Z0-9_\-+/*\\ ]*)\}").unwrap(); static ref REGEX_VARNAME: Regex = Regex::new(r"^[_a-zA-Z]+[a-zA-Z_0-9]*$").unwrap(); } -fn parse_format_string( - source_type: SourceType, - valid_names: Option<&Vec>, - s: &str, -) -> FilterxResult<(String, Option>)> { +fn parse_format_string(s: &str, vm: &mut Vm) -> FilterxResult<(String, Option>)> { // value: "xxxxx" -> "xxxxx" // value: "xxx_{seq}" -> "xxx_{}" and col("seq") // value: "xxx_{seq}_{seq}" -> "xxx_{}_{}" and col("seq"), col("seq") @@ -38,11 +31,6 @@ fn parse_format_string( let re = ®EX_PATTERN; let fmt = re.replace_all(s, "{}").to_string(); let mut cols = Vec::new(); - let source = DataframeSource::new(DataFrame::empty().lazy()); - let mut vm = Vm::from_source(Source::new(source.into(), source_type)); - if let Some(valid_names) = valid_names { - vm.source_mut().set_init_column_names(valid_names); - } for cap in re.captures_iter(s) { let item = cap.get(1).unwrap().as_str(); if item.is_empty() { @@ -58,12 +46,15 @@ fn parse_format_string( } let ast = vm.ast(item)?; if !ast.is_expression() { - return Err(FilterxError::RuntimeError( - "Error format string, only support expression".to_string(), - )); + let h = &mut vm.hint; + h.white("Only support expression in ") + .cyan("print") + .white(", but got ") + .red(item) + .print_and_exit(); } let ast = ast.expression().unwrap(); - let ast = ast.eval(&mut vm)?; + let ast = ast.eval(vm)?; let value = ast.expr()?; cols.push(value); } @@ -73,9 +64,9 @@ fn parse_format_string( #[test] fn test_parse_format_string() { use polars::prelude::col; - + let mut vm = Vm::mock(SourceType::Fasta); let s = "xxx_{seq}"; - let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); + let (fmt, cols) = parse_format_string(s, &mut vm).unwrap(); assert_eq!(fmt, "xxx_{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -83,7 +74,7 @@ fn test_parse_format_string() { assert_eq!(cols[0], col("seq")); let s = "xxx_{seq}_{seq}"; - let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); + let (fmt, cols) = parse_format_string(s, &mut vm).unwrap(); assert_eq!(fmt, "xxx_{}_{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -92,12 +83,12 @@ fn test_parse_format_string() { assert_eq!(cols[1], col("seq")); let s = "xxx"; - let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); + let (fmt, cols) = parse_format_string(s, &mut vm).unwrap(); assert_eq!(fmt, "xxx"); assert!(cols.is_none()); let s = "xxx{len(seq)}"; - let (fmt, cols) = parse_format_string(SourceType::Fasta, None, s).unwrap(); + let (fmt, cols) = parse_format_string(s, &mut vm).unwrap(); assert_eq!(fmt, "xxx{}"); assert!(cols.is_some()); let cols = cols.unwrap(); @@ -123,11 +114,9 @@ pub fn print<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult PolarsResult> { pub fn gc<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { expect_args_len(args, 1)?; + if vm.source.source_type.is_fasta() || vm.source.source_type.is_fastq() { + if vm.source.source_type.is_fasta() { + let fasta = vm.source.get_fasta()?; + match fasta.record_type { + FastaRecordType::Protein => { + let h = &mut vm.hint; + h.white("gc: protein sequence is not supported") + .print_and_exit(); + } + _ => {} + } + } + } let col_name = eval_col!(vm, &args[0], "gc: expected a column name as first argument"); - let col_name = col_name.column()?; - vm.source_mut().has_column(col_name); - let e = col(col_name).map(compute_gc, GetOutput::float_type()); + let name = col_name.column()?; + let e = col_name.expr()?; + vm.source_mut().has_column(name); + let e = e.map(compute_gc, GetOutput::float_type()); return Ok(value::Value::named_expr(None, e)); } diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/mod.rs b/src/filterx_engine/src/eval/call/builtin/sequence/mod.rs index 90d6219..144604a 100644 --- a/src/filterx_engine/src/eval/call/builtin/sequence/mod.rs +++ b/src/filterx_engine/src/eval/call/builtin/sequence/mod.rs @@ -4,5 +4,7 @@ builtin_function! { gc, revcomp, to_fasta, - to_fastq + to_fastq, + qual, + phred } diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/phred.rs b/src/filterx_engine/src/eval/call/builtin/sequence/phred.rs new file mode 100644 index 0000000..7d00f6c --- /dev/null +++ b/src/filterx_engine/src/eval/call/builtin/sequence/phred.rs @@ -0,0 +1,15 @@ +use super::super::*; +pub fn phred(vm: &mut Vm) -> FilterxResult { + if vm.source_type() == SourceType::Fastq { + let fastp = vm.source.get_fastq()?; + let h = &mut vm.hint; + h.white("phred: ") + .green(&format!("{}", fastp.quality_type)) + .print_and_exit(); + } + let h = &mut vm.hint; + h.white("phred: Only ") + .cyan("fastq") + .white(" format is supported for now.") + .print_and_exit(); +} diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/qual.rs b/src/filterx_engine/src/eval/call/builtin/sequence/qual.rs new file mode 100644 index 0000000..d2ecd1e --- /dev/null +++ b/src/filterx_engine/src/eval/call/builtin/sequence/qual.rs @@ -0,0 +1,161 @@ +use super::super::*; +use filterx_source::QualityType; +use polars::prelude::*; + +use polars_arrow::{ + array::{ArrayRef, Float32Array, Utf8ViewArray}, + buffer::Buffer, + datatypes::ArrowDataType, +}; + +// copy from https://github.com/shenwei356/bio/blob/master/seq/seq.go + +static mut QUAL_MAP: [f32; 256] = [0.0; 256]; + +fn init_qual_map() { + for i in 0..256 { + unsafe { + // Q = -10 * log10(p) + // p: rate of base error, p = 10 ^ (-Q / 10) + // Q: quality score, use i representing 0-255 + // phred33: ASCII 33 + Q + // phred64: ASCII 64 + Q + // so, while computing the probability, we need to minus 33 or 64 to get original phred score as Q + QUAL_MAP[i] = f32::powf(10.0, i as f32 / -10.0); + } + } +} + +fn compute_qual_phred33_kernel(array: &Utf8ViewArray) -> ArrayRef { + let array = array + .values_iter() + .map(|seq| { + let length = seq.len(); + if length == 0 { + return 0.0; + } + let max = seq.bytes().max().unwrap(); + let min = seq.bytes().min().unwrap(); + if max > 127 || min < 33 { + return 0.0; + } + let sum_qual: f32 = seq + .bytes() + .map(|b| unsafe { + let b = b as usize - 33; + QUAL_MAP[b] + }) + .sum(); + if sum_qual == 0.0 { + return 0.0; + } + -10.0 * (sum_qual / seq.len() as f32).log10() + }) + .collect::>(); + let values: Buffer<_> = array.into(); + let array = Float32Array::new(ArrowDataType::Float32, values, None); + Box::new(array) +} + +fn compute_qual_phred64_kernel(array: &Utf8ViewArray) -> ArrayRef { + let array = array + .values_iter() + .map(|seq| { + let length = seq.len(); + if length == 0 { + return 0.0; + } + let max = seq.bytes().max().unwrap(); + let min = seq.bytes().min().unwrap(); + if max > 127 || min < 33 { + return 0.0; + } + let qual_sum: f32 = seq + .bytes() + .map(|c| unsafe { + let c = c as usize - 64; + QUAL_MAP[c] as f32 + }) + .sum(); + if qual_sum == 0.0 { + return 0.0; + } + -10.0 * (qual_sum / seq.len() as f32).log10() + }) + .collect::>(); + let values: Buffer<_> = array.into(); + let array = Float32Array::new(ArrowDataType::Float32, values, None); + Box::new(array) +} + +fn compute_qual_phred64(s: Column) -> PolarsResult> { + if !s.dtype().is_string() { + return Err(PolarsError::InvalidOperation( + format!( + "Computing quality needs a string column, got column `{}` with type `{}`", + s.name(), + s.dtype() + ) + .into(), + )); + } + init_qual_map(); + let s = s.as_materialized_series(); + let s = s.str()?.as_string(); + let c = s + .apply_kernel_cast::(&compute_qual_phred64_kernel) + .into_column(); + Ok(Some(c)) +} + +fn compute_qual_phred33(s: Column) -> PolarsResult> { + if !s.dtype().is_string() { + return Err(PolarsError::InvalidOperation( + format!( + "Computing quality needs a string column, got column `{}` with type `{}`", + s.name(), + s.dtype() + ) + .into(), + )); + } + init_qual_map(); + let s = s.as_materialized_series(); + let s = s.str()?.as_string(); + let c = s + .apply_kernel_cast::(&compute_qual_phred33_kernel) + .into_column(); + Ok(Some(c)) +} + +pub fn qual<'a>(vm: &'a mut Vm, args: &Vec) -> FilterxResult { + expect_args_len(args, 1)?; + if !vm.source.source_type.is_fastq() { + let h = &mut vm.hint; + h.white("qual: Only available on fastq source") + .print_and_exit(); + } + let col_name = eval_col!( + vm, + &args[0], + "qual: expected a column name as first argument" + ); + let qtype = vm.source.get_fastq()?; + let name = col_name.column()?; + let mut e = col_name.expr()?; + vm.source().has_column(name); + match qtype.quality_type { + QualityType::Phred33 => { + e = e.map(compute_qual_phred33, GetOutput::float_type()); + } + QualityType::Phred64 => { + e = e.map(compute_qual_phred64, GetOutput::float_type()); + } + QualityType::Auto => { + let h = &mut vm.hint; + h.white("qual: Unable to detect quality type") + .print_and_exit(); + } + }; + return Ok(value::Value::named_expr(None, e)); +} diff --git a/src/filterx_engine/src/eval/call/builtin/sequence/revcomp.rs b/src/filterx_engine/src/eval/call/builtin/sequence/revcomp.rs index 1964d53..605f1ad 100644 --- a/src/filterx_engine/src/eval/call/builtin/sequence/revcomp.rs +++ b/src/filterx_engine/src/eval/call/builtin/sequence/revcomp.rs @@ -2,9 +2,10 @@ use std::borrow::Cow; use super::super::*; +use filterx_source::FastaRecordType; use polars::prelude::*; -fn compute_revcomp(s: Column) -> PolarsResult> { +fn compute_revcomp_dna(s: Column) -> PolarsResult> { let ca = s.str()?; let ca = ca.apply_values(|s| { let s: String = s @@ -27,6 +28,29 @@ fn compute_revcomp(s: Column) -> PolarsResult> { Ok(Some(ca.into_column())) } +fn compute_revcomp_rna(s: Column) -> PolarsResult> { + let ca = s.str()?; + let ca = ca.apply_values(|s| { + let s: String = s + .chars() + .rev() + .map(|b| match b { + 'A' => 'U', + 'U' => 'A', + 'C' => 'G', + 'G' => 'C', + 'a' => 'u', + 'u' => 'a', + 'c' => 'g', + 'g' => 'c', + _ => b, + }) + .collect(); + Cow::Owned(s) + }); + Ok(Some(ca.into_column())) +} + pub fn revcomp<'a>( vm: &'a mut Vm, args: &Vec, @@ -39,13 +63,39 @@ pub fn revcomp<'a>( &args[0], "revcomp: expected a column name as first argument" ); - let name = col_name.column()?; - let e = col_name.expr()?; - vm.source_mut().has_column(name); - let e = e.map(compute_revcomp, GetOutput::same_type()); - if inplace { - vm.source_mut().with_column(e.clone().alias(name), None); - return Ok(value::Value::None); + if vm.source.source_type.is_fasta() || vm.source.source_type.is_fastq() { + let name = col_name.column()?; + let mut e = col_name.expr()?; + vm.source_mut().has_column(name); + if vm.source.source_type.is_fasta() { + let fasta = vm.source.get_fasta()?; + match fasta.record_type { + FastaRecordType::Dna => { + e = e.map(compute_revcomp_dna, GetOutput::same_type()); + } + FastaRecordType::Rna => { + e = e.map(compute_revcomp_rna, GetOutput::same_type()); + } + FastaRecordType::Protein => { + let h = &mut vm.hint; + h.white("revcomp: protein sequences are not supported") + .print_and_exit(); + } + FastaRecordType::Auto => { + let h = &mut vm.hint; + h.white("revcomp: unknown sequence type.").print_and_exit(); + } + } + } + + if inplace { + vm.source_mut().with_column(e.clone().alias(name), None); + return Ok(value::Value::None); + } + return Ok(value::Value::named_expr(Some(name.to_string()), e)); + } else { + let h = &mut vm.hint; + h.white("revcomp: Only fastq and fasta are supported.") + .print_and_exit() } - return Ok(value::Value::named_expr(Some(name.to_string()), e)); } diff --git a/src/filterx_engine/src/eval/call/call.rs b/src/filterx_engine/src/eval/call/call.rs index c332c74..9670658 100644 --- a/src/filterx_engine/src/eval/call/call.rs +++ b/src/filterx_engine/src/eval/call/call.rs @@ -43,6 +43,8 @@ impl<'a> Eval<'a> for ast::ExprCall { function_name = "cast".to_string(); } + // TODO check if the function can executable in this context + match function_name.as_str() { "alias" => call::alias(vm, &self.args), "drop" => call::drop(vm, &self.args), @@ -69,6 +71,8 @@ impl<'a> Eval<'a> for ast::ExprCall { "print" | "format" | "fmt" => call::print(vm, &self.args), "limit" => call::limit(vm, &self.args), "gc" => call::gc(vm, &self.args), + "qual" => call::qual(vm, &self.args), + "phred" => call::phred(vm), "rev" => call::rev(vm, &self.args, false), "rev_" => call::rev(vm, &self.args, true), "revcomp" => call::revcomp(vm, &self.args, false), diff --git a/src/filterx_engine/src/eval/ops.rs b/src/filterx_engine/src/eval/ops.rs index ab018cc..0a36d34 100644 --- a/src/filterx_engine/src/eval/ops.rs +++ b/src/filterx_engine/src/eval/ops.rs @@ -7,7 +7,7 @@ use super::super::ast; use crate::eval::Eval; use crate::vm::Vm; -pub use crate::{eval, eval_col}; +pub use crate::{eval, eval_col, execuable}; use filterx_core::{util, value::Value, FilterxError, FilterxResult, Hint}; impl<'a> Eval<'a> for ast::ExprUnaryOp { @@ -290,6 +290,7 @@ fn binop_for_dataframe(left: Value, right: Value, op: ast::Operator) -> FilterxR impl<'a> Eval<'a> for ast::ExprBoolOp { type Output = Value; fn eval(&self, vm: &'a mut Vm) -> FilterxResult { + execuable!(vm, "`and` or `or`"); let left = &self.values[0]; let vm_apply_lazy = vm.status.apply_lazy; vm.status.update_apply_lazy(false); @@ -362,6 +363,7 @@ fn boolop_in_dataframe<'a>( impl<'a> Eval<'a> for ast::ExprCompare { type Output = Value; fn eval(&self, vm: &'a mut Vm) -> FilterxResult { + execuable!(vm, "`>=`, `<=`, `==`, `!=`, `<`, `>`, `in`, `not in`"); if self.ops.len() > 1 { let h = &mut vm.hint; h.white("Only support one compare op, like a > 1. If you want to chain compare, use `and` or `or`") diff --git a/src/filterx_engine/src/vm.rs b/src/filterx_engine/src/vm.rs index 2a333cb..0c892e3 100644 --- a/src/filterx_engine/src/vm.rs +++ b/src/filterx_engine/src/vm.rs @@ -4,16 +4,19 @@ use polars::prelude::*; use filterx_core::{FilterxError, FilterxResult, Hint}; use filterx_source::source::SourceType; -use filterx_source::{DataframeSource, Source, SourceInner}; +use filterx_source::{ + DataframeSource, FastaRecordType, FastaSource, FastqSource, QualityType, Source, SourceInner, +}; use super::eval::Eval; use std::io::BufWriter; use std::io::Write; +#[derive(Debug, PartialEq)] pub enum VmMode { - Interactive, Expression, + Printable } #[derive(Debug)] @@ -77,6 +80,30 @@ pub struct Vm { } impl Vm { + pub fn mock(source_type: SourceType) -> Vm { + let innser: SourceInner = match source_type { + SourceType::Fasta => FastaSource::new("", false, FastaRecordType::Dna, 0) + .unwrap() + .into(), + SourceType::Fastq => FastqSource::new("", false, false, QualityType::Phred33, 0) + .unwrap() + .into(), + _ => DataframeSource::new(DataFrame::empty().lazy()).into(), + }; + + let vm = Vm { + eval_expr: "".to_string(), + parse_cache: HashMap::new(), + mode: VmMode::Expression, + source: Source::new(innser, source_type), + status: VmStatus::default(), + writer: None, + expr_cache: HashMap::new(), + hint: Hint::new(), + }; + vm + } + pub fn from_source(source: Source) -> Self { Self { eval_expr: String::new(), @@ -105,6 +132,7 @@ impl Vm { && !s.contains("!=") && !s.contains(">=") && !s.contains("<=") + && !s.contains("print(") { let expr = rustpython_parser::parse(s, rustpython_parser::Mode::Interactive, "")?; return Ok(expr); diff --git a/src/filterx_source/Cargo.toml b/src/filterx_source/Cargo.toml index d7a71f6..f3a1708 100644 --- a/src/filterx_source/Cargo.toml +++ b/src/filterx_source/Cargo.toml @@ -14,3 +14,5 @@ flate2 = { workspace = true } polars = { workspace = true } regex = { workspace = true } filterx_core = { workspace = true } +clap = { workspace = true } +memchr = { workspace = true } diff --git a/src/filterx_source/src/block/fastx/fasta.rs b/src/filterx_source/src/block/fastx/fasta.rs index 1c810ea..ea65863 100644 --- a/src/filterx_source/src/block/fastx/fasta.rs +++ b/src/filterx_source/src/block/fastx/fasta.rs @@ -1,9 +1,8 @@ -use polars::prelude::*; -use std::io::BufRead; - -use super::FastaRecordType; use crate::block::reader::TableLikeReader; use crate::dataframe::DataframeSource; +use polars::prelude::*; +use std::collections::HashSet; +use std::io::BufRead; use filterx_core::{FilterxResult, Hint}; @@ -35,8 +34,13 @@ impl Drop for FastaSource { } impl FastaSource { - pub fn new(path: &str, include_comment: bool) -> FilterxResult { - let fasta = Fasta::from_path(path)?; + pub fn new( + path: &str, + include_comment: bool, + record_type: FastaRecordType, + n_detect: usize, + ) -> FilterxResult { + let fasta = Fasta::from_path(path, record_type, n_detect)?; let opt = FastaParserOptions { include_comment }; let fasta = fasta.set_parser_options(opt); let records = vec![FastaRecord::default(); 4096]; @@ -102,6 +106,14 @@ pub struct Fasta { break_line_len: Option, } +#[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq)] +pub enum FastaRecordType { + Dna, + Rna, + Protein, + Auto, +} + #[derive(Clone, Debug)] pub struct FastaRecord { buffer: Vec, @@ -228,17 +240,77 @@ impl Clone for Fasta { } impl Fasta { - pub fn from_path(path: &str) -> FilterxResult { - Ok(Fasta { + pub fn from_path( + path: &str, + record_type: FastaRecordType, + n_detect: usize, + ) -> FilterxResult { + let mut fasta = Fasta { reader: TableLikeReader::new(path)?, prev_line: Vec::new(), read_end: false, path: path.to_string(), parser_options: FastaParserOptions::default(), record: FastaRecord::default(), - record_type: FastaRecordType::DNA, + record_type, break_line_len: None, - }) + }; + if record_type == FastaRecordType::Auto { + fasta.detect_record_type(n_detect)?; + } + Ok(fasta) + } + + pub fn detect_record_type(&mut self, n: usize) -> FilterxResult<()> { + let mut hashset = HashSet::new(); + for _ in 0..n { + let record = self.parse_next()?; + if record.is_none() { + break; + } + let seq = record.unwrap().seq(); + for c in seq.bytes() { + if c == b'n' || c == b'N' { + continue; + } + hashset.insert(c); + } + } + if hashset.len() < 4 { + let mut h = Hint::new(); + h.white("Too less sequences are used to detect alphabet. Try increase the number of sequences to detect alphabet.") + .print_and_exit(); + } + if hashset.len() > 4 { + self.record_type = FastaRecordType::Protein; + } + let contain_t = hashset.contains(&b'T') || hashset.contains(&b't'); + let contain_u = hashset.contains(&b'u') || hashset.contains(&b'U'); + if contain_t && contain_u { + let mut h = Hint::new(); + h.white("The fasta file contains both ") + .cyan("'T'") + .white(" and ") + .cyan("'U'") + .white(" nucleotides. Can not determine the record type.") + .print_and_exit(); + } + if !contain_t && !contain_u { + let mut h = Hint::new(); + h.white("The fasta file contains none of ") + .cyan("'T'") + .white(" and ") + .cyan("'U'") + .white(" nucleotides. Can not determine the record type.") + .print_and_exit(); + } + if contain_t { + self.record_type = FastaRecordType::Dna; + } else if contain_u { + self.record_type = FastaRecordType::Rna; + } + self.reset()?; + Ok(()) } pub fn set_parser_options(mut self, parser_options: FastaParserOptions) -> Self { @@ -249,6 +321,7 @@ impl Fasta { pub fn reset(&mut self) -> FilterxResult<()> { self.reader.reset()?; self.prev_line.clear(); + self.record.clear(); self.read_end = false; Ok(()) } @@ -396,7 +469,7 @@ pub mod test { #[test] fn test_open_plain_file() -> FilterxResult<()> { let path = "test_data/fasta/1.fa"; - let fasta = Fasta::from_path(path)?; + let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?; let records = fasta.into_iter().collect::>(); assert!(records.len() == 2); Ok(()) @@ -405,7 +478,7 @@ pub mod test { #[test] fn test_open_gzip_file() -> FilterxResult<()> { let path = "test_data/fasta/1.fa.gz"; - let fasta = Fasta::from_path(path)?; + let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?; let records = fasta.into_iter().collect::>(); assert!(records.len() == 2); Ok(()) diff --git a/src/filterx_source/src/block/fastx/fastq.rs b/src/filterx_source/src/block/fastx/fastq.rs index 90484db..ed55147 100644 --- a/src/filterx_source/src/block/fastx/fastq.rs +++ b/src/filterx_source/src/block/fastx/fastq.rs @@ -1,5 +1,5 @@ use polars::prelude::*; -use std::io::BufRead; +use std::{fmt::Display, io::BufRead}; use crate::block::reader::TableLikeReader; use crate::dataframe::DataframeSource; @@ -21,12 +21,20 @@ impl Drop for FastqSource { } impl FastqSource { - pub fn new(path: &str, include_comment: bool, include_qual: bool) -> FilterxResult { + pub fn new( + path: &str, + include_comment: bool, + include_qual: bool, + quality_type: QualityType, + detect_size: usize, + ) -> FilterxResult { let parser_option = FastqParserOption { include_comment, include_qual, + phred: quality_type, }; - let fastq = Fastq::from_path(path)?.set_parser_options(parser_option); + let fastq = + Fastq::from_path(path, quality_type, detect_size)?.set_parser_options(parser_option); let records = vec![FastqRecord::default(); 4096]; let dataframe = DataframeSource::new(DataFrame::empty().lazy()); Ok(FastqSource { @@ -85,6 +93,7 @@ impl FastqSource { pub struct FastqParserOption { pub include_qual: bool, pub include_comment: bool, + pub phred: QualityType, } impl Default for FastqParserOption { @@ -92,20 +101,31 @@ impl Default for FastqParserOption { FastqParserOption { include_qual: true, include_comment: true, + phred: QualityType::Phred33, } } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum QulityType { +#[derive(Debug, Clone, Copy, PartialEq, clap::ValueEnum)] +pub enum QualityType { Phred33, Phred64, - Unknown, + Auto, } -impl Default for QulityType { +impl Display for QualityType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QualityType::Phred33 => write!(f, "phred33"), + QualityType::Phred64 => write!(f, "phred64"), + QualityType::Auto => write!(f, "unknown"), + } + } +} + +impl Default for QualityType { fn default() -> Self { - QulityType::Unknown + QualityType::Phred33 } } @@ -117,7 +137,7 @@ pub struct Fastq { record: FastqRecord, pub line_buffer: Vec, break_line_len: Option, - pub quality_type: QulityType, + pub quality_type: QualityType, } #[derive(Debug, Clone)] @@ -179,6 +199,14 @@ impl FastqRecord { pub fn format<'a>(&'a self) -> &'a str { unsafe { std::str::from_utf8_unchecked(&self.buffer) } } + + pub fn as_phred33(&mut self) -> () { + if self._qual.0 != self._qual.1 { + for i in self._qual.0..self._qual.1 { + self.buffer[i] = self.buffer[i] - 33; + } + } + } } impl FastqRecord { @@ -253,7 +281,11 @@ impl IntoIterator for Fastq { } impl Fastq { - pub fn from_path(path: &str) -> FilterxResult { + pub fn from_path( + path: &str, + quality_type: QualityType, + detect_size: usize, + ) -> FilterxResult { let mut fq = Fastq { reader: TableLikeReader::new(path)?, read_end: false, @@ -262,19 +294,21 @@ impl Fastq { record: FastqRecord::default(), line_buffer: Vec::with_capacity(512), break_line_len: None, - quality_type: QulityType::default(), + quality_type: quality_type, }; - fq.guess_quality_type()?; + if quality_type == QualityType::Auto { + fq.guess_quality_type(detect_size)?; + } Ok(fq) } - fn guess_quality_type(&mut self) -> FilterxResult<()> { + fn guess_quality_type(&mut self, detect_size: usize) -> FilterxResult<()> { if !self.parser_option.include_qual { return Ok(()); } - let mut qualitys: [QulityType; 100] = [QulityType::Unknown; 100]; + let mut qualitys = vec![QualityType::Auto; detect_size]; let mut count = 0; - for _ in 0..100 { + for _ in 0..detect_size { let record = self.parse_next()?; if let Some(record) = record { let qual = record.qual(); @@ -282,12 +316,17 @@ impl Fastq { let qual_u8 = qual.as_bytes(); let max = qual_u8.iter().max().unwrap(); let min = qual_u8.iter().min().unwrap(); - let new_quality_type = if *max >= 64 && *min >= 33 { - QulityType::Phred64 - } else if *max <= 64 && *min <= 33 { - QulityType::Phred33 + // Sanger: 0 - 40 +33 33 - 73 phred33 + // Solexa: -5 - 40 +64 59 - 124 phred64 not supported! + // Illumina 1.3: 0 - 40 +64 64 - 124 phred64 + // Illumina 1.5: 3 - 40 +64 67 - 104 phred64 0,1,2 are clipped + // Illumina 1.8+: 0 - 41 +33 33 - 73 phred33 + let new_quality_type = if *max >= 73 && *min >= 64 { + QualityType::Phred64 + } else if *max <= 73 && *min >= 33 { + QualityType::Phred33 } else { - QulityType::Unknown + QualityType::Auto }; qualitys[count] = new_quality_type; count += 1; @@ -295,12 +334,17 @@ impl Fastq { } } let t = if count == 0 { - QulityType::Unknown + QualityType::Auto } else { let mut t = qualitys[0]; for i in 1..count { if qualitys[i] != t { - t = QulityType::Unknown; + t = QualityType::Auto; + if t == QualityType::Auto { + return Err(filterx_core::FilterxError::FastqError( + "Fastq quality type is not consistent".to_string(), + )); + } break; } } @@ -499,6 +543,8 @@ impl Fastq { pub fn reset(&mut self) -> FilterxResult<()> { self.reader.reset()?; self.read_end = false; + self.line_buffer.clear(); + self.record.buffer.clear(); Ok(()) } } diff --git a/src/filterx_source/src/block/fastx/mod.rs b/src/filterx_source/src/block/fastx/mod.rs index f2a39a6..54685b4 100644 --- a/src/filterx_source/src/block/fastx/mod.rs +++ b/src/filterx_source/src/block/fastx/mod.rs @@ -1,10 +1,2 @@ pub mod fasta; pub mod fastq; - -#[derive(Clone, Debug)] -pub enum FastaRecordType { - DNA, - RNA, - PROTEIN, - UNKNOWN, -} diff --git a/src/filterx_source/src/dataframe.rs b/src/filterx_source/src/dataframe.rs index 725d9e5..6d3498a 100644 --- a/src/filterx_source/src/dataframe.rs +++ b/src/filterx_source/src/dataframe.rs @@ -3,6 +3,7 @@ use polars::prelude::*; use filterx_core::{FilterxResult, Hint}; use regex::Regex; +#[derive(Clone)] pub struct DataframeSource { pub lazy: LazyFrame, pub has_header: bool, diff --git a/src/filterx_source/src/lib.rs b/src/filterx_source/src/lib.rs index 25413db..60a43da 100644 --- a/src/filterx_source/src/lib.rs +++ b/src/filterx_source/src/lib.rs @@ -2,8 +2,8 @@ pub mod block; pub mod dataframe; pub mod source; -pub use block::fasta::FastaSource; -pub use block::fastq::FastqSource; +pub use block::fasta::{FastaRecordType, FastaSource}; +pub use block::fastq::{FastqSource, QualityType}; pub use dataframe::detect_columns; pub use dataframe::DataframeSource; pub use source::{Source, SourceInner, SourceType}; diff --git a/src/filterx_source/src/source.rs b/src/filterx_source/src/source.rs index e29b0b6..ba9a651 100644 --- a/src/filterx_source/src/source.rs +++ b/src/filterx_source/src/source.rs @@ -1,8 +1,8 @@ -use crate::block::fasta::FastaSource; -use crate::block::fastq::FastqSource; +use crate::block::fasta::{Fasta, FastaSource}; +use crate::block::fastq::{Fastq, FastqSource}; use crate::DataframeSource; -use filterx_core::FilterxResult; +use filterx_core::{FilterxError, FilterxResult}; use polars::prelude::*; #[derive(Debug, PartialEq, Clone, Copy)] @@ -12,7 +12,47 @@ pub enum SourceType { Fastq, Vcf, Sam, - Gxf, + Gff, + Gtf, +} + +impl SourceType { + pub fn is_fasta(&self) -> bool { + match self { + SourceType::Fasta => true, + _ => false, + } + } + pub fn is_fastq(&self) -> bool { + match self { + SourceType::Fastq => true, + _ => false, + } + } + pub fn is_vcf(&self) -> bool { + match self { + SourceType::Vcf => true, + _ => false, + } + } + pub fn is_gff(&self) -> bool { + match self { + SourceType::Gff => true, + _ => false, + } + } + pub fn is_gtf(&self) -> bool { + match self { + SourceType::Gtf => true, + _ => false, + } + } + pub fn is_sam(&self) -> bool { + match self { + SourceType::Sam => true, + _ => false, + } + } } impl Into<&str> for SourceType { @@ -23,7 +63,8 @@ impl Into<&str> for SourceType { SourceType::Fastq => "fastq", SourceType::Vcf => "vcf", SourceType::Sam => "sam", - SourceType::Gxf => "gxf", + SourceType::Gff => "gff", + SourceType::Gtf => "gtf", } } } @@ -36,7 +77,8 @@ impl From<&str> for SourceType { "fastq" => SourceType::Fastq, "vcf" => SourceType::Vcf, "sam" => SourceType::Sam, - "gxf" => SourceType::Gxf, + "gff" => SourceType::Gff, + "gtf" => SourceType::Gtf, _ => panic!("Invalid source type"), } } @@ -103,4 +145,22 @@ impl Source { let df = s.lazy(); Ok(df.collect()?) } + + pub fn get_fastq(&self) -> FilterxResult<&Fastq> { + match &self.inner { + SourceInner::Fastq(fastq) => Ok(&fastq.fastq), + _ => Err(FilterxError::RuntimeError( + "get_fastq only support Fastq source".into(), + )), + } + } + + pub fn get_fasta(&self) -> FilterxResult<&Fasta> { + match &self.inner { + SourceInner::Fasta(fasta) => Ok(&fasta.fasta), + _ => Err(FilterxError::RuntimeError( + "get_fasta only support Fasta source".into(), + )), + } + } }