From 8e7730cc692e3d6041edb23cafe3e347272dce9a Mon Sep 17 00:00:00 2001 From: dwpeng Date: Fri, 15 Nov 2024 21:32:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20breakline=20detection=20in?= =?UTF-8?q?=20TableLikeReader=20and=20update=20FastqSource=20to=20use=20ne?= =?UTF-8?q?w=20detection=20method.=20That's=20all.=20Happy=20birthday=20to?= =?UTF-8?q?=20me!=20=20=F0=9F=8E=89=F0=9F=8E=89=F0=9F=8E=89=F0=9F=8E=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/filterx_engine/src/vm.rs | 13 +- src/filterx_source/src/block/fastx/fasta.rs | 155 +++++++------------- src/filterx_source/src/block/fastx/fastq.rs | 34 +---- src/filterx_source/src/block/reader.rs | 24 +++ 4 files changed, 97 insertions(+), 129 deletions(-) diff --git a/src/filterx_engine/src/vm.rs b/src/filterx_engine/src/vm.rs index ac06fef..e24ceb6 100644 --- a/src/filterx_engine/src/vm.rs +++ b/src/filterx_engine/src/vm.rs @@ -239,15 +239,24 @@ impl Vm { self.status.printed = false; match self.source_type() { SourceType::Fasta | SourceType::Fastq => { + if self.status.stop { + return Ok(None); + } let left = self.status.limit_rows - self.status.consume_rows; let left = left.min(self.status.chunk_size); if left > 0 { match self.source.inner { SourceInner::Fasta(ref mut fasta) => { - fasta.into_dataframe(left)?; + let count = fasta.into_dataframe(left)?; + if count < left || count == 0 { + self.status.stop = true; + } } SourceInner::Fastq(ref mut fastq) => { - fastq.into_dataframe(left)?; + let count = fastq.into_dataframe(left)?; + if count < left || count == 0 { + self.status.stop = true; + } } _ => { unreachable!(); diff --git a/src/filterx_source/src/block/fastx/fasta.rs b/src/filterx_source/src/block/fastx/fasta.rs index ea65863..1caeb2f 100644 --- a/src/filterx_source/src/block/fastx/fasta.rs +++ b/src/filterx_source/src/block/fastx/fasta.rs @@ -1,11 +1,11 @@ -use crate::block::reader::TableLikeReader; +use crate::block::reader::{detect_breakline_len, TableLikeReader}; use crate::dataframe::DataframeSource; +use filterx_core::{FilterxResult, Hint}; +use memchr::memchr; use polars::prelude::*; use std::collections::HashSet; use std::io::BufRead; -use filterx_core::{FilterxResult, Hint}; - #[derive(Debug, Clone, Copy)] pub struct FastaParserOptions { pub include_comment: bool, @@ -52,7 +52,7 @@ impl FastaSource { }) } - pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { + pub fn into_dataframe(&mut self, n: usize) -> FilterxResult { let records = &mut self.records; if records.capacity() < n { unsafe { @@ -82,12 +82,11 @@ impl FastaSource { records.set_len(count); } if records.is_empty() { - return Ok(None); + return Ok(0); } - let df = Fasta::as_dataframe(&records, &self.fasta.parser_options)?; self.dataframe.update(df.lazy()); - Ok(Some(())) + Ok(count) } pub fn reset(&mut self) -> FilterxResult<()> { @@ -97,13 +96,13 @@ impl FastaSource { pub struct Fasta { reader: TableLikeReader, - prev_line: Vec, read_end: bool, pub path: String, pub parser_options: FastaParserOptions, record: FastaRecord, pub record_type: FastaRecordType, break_line_len: Option, + buffer_unprocess_size: usize, } #[derive(Clone, Copy, Debug, clap::ValueEnum, PartialEq)] @@ -184,43 +183,39 @@ impl FastaRecord { pub fn len(&self) -> usize { self._sequence.1 - self._sequence.0 + 1 } -} -impl std::fmt::Display for FastaRecord { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, ">{}", self.name())?; - if let Some(comment) = self.comment() { - write!(f, " {}", comment)?; + pub fn goto_next_record(&mut self, left: usize) { + if self.buffer.len() < self.buffer.capacity() && left > 0 { + unsafe { + std::ptr::copy( + self.buffer.as_ptr().add(self.buffer.len()), + self.buffer.as_mut_ptr(), + left, + ); + self.buffer.set_len(left); + } } - write!(f, "\n{}", self.seq()) + self._comment = (0, 0); + self._name = (0, 0); + self._sequence = (0, 0); } -} - -pub struct FastaRecordIter { - fasta: Fasta, -} - -impl Iterator for FastaRecordIter { - type Item = FastaRecord; - fn next(&mut self) -> Option { - match self.fasta.parse_next() { - Ok(Some(record)) => Some(record.clone()), - Ok(None) => None, - Err(e) => { - eprintln!("{}", e); - None + pub fn remove_breakline_from_buffer(&mut self, len: usize) { + if len > 0 { + unsafe { + self.buffer.set_len(self.buffer.len() - len); } } } } -impl IntoIterator for Fasta { - type Item = FastaRecord; - type IntoIter = FastaRecordIter; - - fn into_iter(self) -> Self::IntoIter { - FastaRecordIter { fasta: self } +impl std::fmt::Display for FastaRecord { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, ">{}", self.name())?; + if let Some(comment) = self.comment() { + write!(f, " {}", comment)?; + } + write!(f, "\n{}", self.seq()) } } @@ -230,11 +225,11 @@ impl Clone for Fasta { reader: self.reader.clone(), path: self.path.clone(), parser_options: self.parser_options.clone(), - prev_line: self.prev_line.clone(), read_end: false, record: self.record.clone(), record_type: self.record_type.clone(), break_line_len: self.break_line_len.clone(), + buffer_unprocess_size: self.buffer_unprocess_size, } } } @@ -247,14 +242,15 @@ impl Fasta { ) -> FilterxResult { let mut fasta = Fasta { reader: TableLikeReader::new(path)?, - prev_line: Vec::new(), read_end: false, path: path.to_string(), parser_options: FastaParserOptions::default(), record: FastaRecord::default(), record_type, break_line_len: None, + buffer_unprocess_size: 0, }; + fasta.break_line_len = detect_breakline_len(&mut fasta.reader)?; if record_type == FastaRecordType::Auto { fasta.detect_record_type(n_detect)?; } @@ -313,6 +309,11 @@ impl Fasta { Ok(()) } + pub fn goto_next_record(&mut self) { + self.record.goto_next_record(self.buffer_unprocess_size); + self.buffer_unprocess_size = 0; + } + pub fn set_parser_options(mut self, parser_options: FastaParserOptions) -> Self { self.parser_options = parser_options; self @@ -320,9 +321,9 @@ impl Fasta { pub fn reset(&mut self) -> FilterxResult<()> { self.reader.reset()?; - self.prev_line.clear(); self.record.clear(); self.read_end = false; + self.buffer_unprocess_size = 0; Ok(()) } @@ -330,12 +331,10 @@ impl Fasta { if self.read_end { return Ok(None); } + self.goto_next_record(); let record: &mut FastaRecord = &mut self.record; - record.clear(); // read name and comment - if !self.prev_line.is_empty() { - record.buffer.extend_from_slice(&self.prev_line); - } else { + if record.buffer.is_empty() { let bytes = self.reader.read_until(b'\n', &mut record.buffer)?; if bytes == 0 { self.read_end = true; @@ -361,36 +360,16 @@ impl Fasta { h.print_and_exit(); } + let break_line_len = self.break_line_len.unwrap(); + // fill name and comment record._name.0 = 1; - record._name.1 = record.buffer.len(); + record._name.1 = record.buffer.len() - break_line_len; - // remove \n or \r\n - let end = record.buffer.len(); - let break_line_len; - if self.break_line_len.is_some() { - break_line_len = self.break_line_len.unwrap(); - } else { - let name = &record.buffer[..]; - if name.ends_with(&[b'\n', b'\r']) { - break_line_len = 2; - } else { - break_line_len = 1; - } - self.break_line_len = Some(break_line_len); - } - record._name.1 = end - break_line_len; + let start = memchr(b' ', &record.buffer[1..record._name.1]); - let mut start = None; - - for i in 0..record._name.1 { - if record.buffer[i] == b' ' { - start = Some(i); - break; - } - } - - if let Some(start) = start { + if let Some(mut start) = start { + start += 1; record._name.1 = start; if self.parser_options.include_comment { record._comment.0 = start + 1; @@ -404,20 +383,22 @@ impl Fasta { } // fill sequence - self.prev_line.clear(); record._sequence.0 = record.buffer.len(); loop { - let bytes = self.reader.read_until(b'\n', &mut self.prev_line)?; + let buffer_offset = record.buffer.len(); + let bytes = self.reader.read_until(b'\n', &mut record.buffer)?; if bytes == 0 { self.read_end = true; break; } - if self.prev_line[0] == b'>' { + if record.buffer[buffer_offset] == b'>' { + unsafe { + record.buffer.set_len(buffer_offset); + } + self.buffer_unprocess_size = bytes; break; } - let line = &self.prev_line[..bytes - break_line_len]; - record.buffer.extend_from_slice(line); - self.prev_line.clear(); + record.remove_breakline_from_buffer(break_line_len); } if record.buffer.is_empty() { return Ok(None); @@ -460,27 +441,3 @@ impl Fasta { Ok(df) } } - -pub mod test { - - #[allow(unused)] - use super::*; - - #[test] - fn test_open_plain_file() -> FilterxResult<()> { - let path = "test_data/fasta/1.fa"; - let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?; - let records = fasta.into_iter().collect::>(); - assert!(records.len() == 2); - Ok(()) - } - - #[test] - fn test_open_gzip_file() -> FilterxResult<()> { - let path = "test_data/fasta/1.fa.gz"; - let fasta = Fasta::from_path(path, FastaRecordType::Auto, 3)?; - let records = fasta.into_iter().collect::>(); - assert!(records.len() == 2); - Ok(()) - } -} diff --git a/src/filterx_source/src/block/fastx/fastq.rs b/src/filterx_source/src/block/fastx/fastq.rs index 263ab66..a812fae 100644 --- a/src/filterx_source/src/block/fastx/fastq.rs +++ b/src/filterx_source/src/block/fastx/fastq.rs @@ -1,8 +1,7 @@ -use memchr::memchr; use polars::prelude::*; use std::{fmt::Display, io::BufRead}; -use crate::block::reader::TableLikeReader; +use crate::block::reader::{TableLikeReader, detect_breakline_len}; use crate::dataframe::DataframeSource; use filterx_core::{FilterxError, FilterxResult, Hint}; @@ -45,7 +44,7 @@ impl FastqSource { }) } - pub fn into_dataframe(&mut self, n: usize) -> FilterxResult> { + pub fn into_dataframe(&mut self, n: usize) -> FilterxResult { let records = &mut self.records; if records.capacity() < n { @@ -77,11 +76,11 @@ impl FastqSource { records.set_len(count); } if records.is_empty() { - Ok(None) + Ok(0) } else { let df = Fastq::as_dataframe(&records, &self.fastq.parser_option)?; self.dataframe.update(df.lazy()); - Ok(Some(())) + Ok(count) } } @@ -291,37 +290,16 @@ impl Fastq { path: path.to_string(), record: FastqRecord::default(), break_line_len: None, - quality_type: quality_type, + quality_type, buffer_unprocess_size: 0, }; - fq.detect_breakline_len()?; + fq.break_line_len = detect_breakline_len(&mut fq.reader)?; if quality_type == QualityType::Auto { fq.guess_quality_type(detect_size)?; } Ok(fq) } - pub fn detect_breakline_len(&mut self) -> FilterxResult<()> { - loop { - let data = self.reader.fill_buf()?; - if data.is_empty() { - self.break_line_len = Some(0); - } - let offset = memchr(b'\n', data); - if offset.is_some() { - // test if endwith is \r\n - let offset = offset.unwrap(); - if offset > 0 && data[offset - 1] == b'\r' { - self.break_line_len = Some(2); - } else { - self.break_line_len = Some(1); - } - break; - } - } - self.reset()?; - Ok(()) - } fn guess_quality_type(&mut self, detect_size: usize) -> FilterxResult<()> { if !self.parser_option.include_qual { diff --git a/src/filterx_source/src/block/reader.rs b/src/filterx_source/src/block/reader.rs index b497485..d1c0390 100644 --- a/src/filterx_source/src/block/reader.rs +++ b/src/filterx_source/src/block/reader.rs @@ -4,6 +4,7 @@ use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::ops::{Deref, DerefMut}; use filterx_core::FilterxResult; +use memchr::memchr; pub struct TableLikeReaderInner { _reader: R, @@ -120,3 +121,26 @@ impl BufRead for TableLikeReader { } } } + +pub fn detect_breakline_len(reader: &mut TableLikeReader) -> FilterxResult> { + let mut break_line_len = 0; + loop { + let data = reader.fill_buf()?; + if data.is_empty() { + break; + } + let offset = memchr(b'\n', data); + if offset.is_some() { + // test if endwith is \r\n + let offset = offset.unwrap(); + if offset > 0 && data[offset - 1] == b'\r' { + break_line_len = 2; + } else { + break_line_len = 1; + } + break; + } + } + reader.reset()?; + Ok(Some(break_line_len)) +}