diff --git a/README.md b/README.md index 894d94e..358f1bc 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![PyPI version](https://badge.fury.io/py/qsv.svg)](https://badge.fury.io/py/qsv) ![PyPI - Downloads](https://img.shields.io/pypi/dm/qsv) -![quilter-csv](https://gist.githubusercontent.com/sumeshi/644af27c8960a9b6be6c7470fe4dca59/raw/4115bc2ccf9ab5fb40a455c34ac0be885b7f263d/quilter-csv.svg) +![quilter-csv](https://gist.githubusercontent.com/sumeshi/644af27c8960a9b6be6c7470fe4dca59/raw/00d774e6814a462eb48e68f29fc6226976238777/quilter-csv.svg) A tool that provides elastic and rapid filtering for efficient analysis of huge CSV files, such as eventlogs. @@ -107,7 +107,7 @@ Filters rows where the specified column matches the given regex. | -------- | ---------- | --------- | ------------- | -------------------------------------------------------------------- | | Argument | colname | str | | The name of the column to test against the regex pattern. | | Argument | pattern | str | | A regular expression pattern used for matching values in the column. | -| Argument | ignorecase | bool | False | If True, performs case-insensitive pattern matching. | +| Option | ignorecase | bool | False | If True, performs case-insensitive pattern matching. | ``` $ qsv load ./Security.csv - contains 'Date and Time' '10/6/2016' @@ -121,7 +121,7 @@ Replaces values using the specified regex. | Argument | colname | str | | The name of the column whose values will be modified. | | Argument | pattern | str | | A regular expression pattern identifying substrings to replace. | | Argument | replacement | str | | The text that replaces matched substrings. | -| Argument | ignorecase | bool | False | If True, the regex matching is performed in a case-insensitive manner. | +| Option | ignorecase | bool | False | If True, the regex matching is performed in a case-insensitive manner. | ``` $ qsv load ./Security.csv - sed 'Date and Time' '/' '-' @@ -134,7 +134,7 @@ This function is similar to running a grep command while preserving the header r | Category | Parameter | Data Type | Default Value | Description | | -------- | ---------- | --------- | ------------- | ------------------------------------------------------------------------------- | | Argument | pattern | str | | A regular expression pattern used to filter rows. Any row with a match is kept. | -| Argument | ignorecase | bool | False | If True, the regex match is case-insensitive. | +| Option | ignorecase | bool | False | If True, the regex match is case-insensitive. | ``` $ qsv load ./Security.csv - grep 'LogonType' @@ -191,12 +191,12 @@ Changes the timezone of the specified date column. The datetime format strings follow the same conventions as [Python](https://docs.python.org/3/library/datetime.html)'s datetime module (based on the C99 standard). -| Category | Parameter | Data Type | Default Value | Description | -| -------- | --------------- | --------- | ------------- | ---------------------------------------------------------------------------------------------- | -| Argument | colname | str | | The name of the date/time column to convert. | -| Option | timezone_from | str | "UTC" | The original timezone of the column's values. | -| Option | timezone_to | str | "UTC" | The target timezone to convert values into. | -| Option | datetime_format | str | AutoDetect | The datetime format for parsing values. If not provided, the format is automatically inferred. | +| Category | Parameter | Data Type | Default Value | Description | +| -------- | --------- | --------- | ------------- | ---------------------------------------------------------------------------------------------- | +| Argument | colname | str | | The name of the date/time column to convert. | +| Option | tz_from | str | "UTC" | The original timezone of the column's values. | +| Option | tz_to | str | "UTC" | The target timezone to convert values into. | +| Option | dt_format | str | AutoDetect | The datetime format for parsing values. If not provided, the format is automatically inferred. | ``` $ qsv load ./Security.csv - changetz 'Date and Time' --timezone_from=UTC --timezone_to=Asia/Tokyo --datetime_format="%m/%d/%Y %I:%M:%S %p" @@ -312,51 +312,143 @@ $ qsv load Security.csv - dump ./Security-qsv.csv ### Quilt -Quilt is a command that allows you to predefine a series of Initializer, Chainable Functions, and Finalizer processes in a YAML configuration file, and then execute them all at once. +Quilt is a command-line tool that allows you to define a sequence of **Initializer**, **Chainable Functions**, and **Finalizer** processes in a YAML configuration file and execute them in a single pipeline. -| Category | Parameter | Data Type | Default Value | Description | -| -------- | --------- | ---------- | ------------- | --------------------------------------------------------------------------------------------------------------- | -| Argument | config | str | | The path to a YAML configuration file defining a set of initialization, transformation, and finalization steps. | -| Argument | path | tuple[str] | | One or more paths to CSV files to be processed according to the predefined rules in the configuration file. | -| Option | debug | bool | False | Enabling this option will output each rule and its intermediate processing results to the standard output. | +#### Usage +| Category | Parameter | Data Type | Default Value | Description | +| -------- | --------- | ---------- | ------------- | ----------------------------------------------------------------------------------------------------------- | +| Argument | config | str | | Path to a YAML configuration file/directory that defines initializers, chainable functions, and finalizers steps. | +| Argument | path | tuple[str] | | One or more paths to CSV files to be processed according to the predefined rules in the configuration file. | + +#### Command Example +```bash +$ qsv quilt rules/test.yaml ./Security.csv ``` -$ qsv quilt rules ./Security.csv -``` -rules/test.yaml +#### Configuration Example +`rules/test.yaml` + +```yaml +title: 'test' +description: 'test processes' +version: '0.1.0' +author: 'John Doe ' +stages: + test_stage: # arbitrary stage name + type: process # operation type + steps: + load: + isin: + colname: EventId + values: + - 4624 + head: + number: 5 + select: + colnames: + - RecordNumber + - TimeCreated + changetz: + colname: TimeCreated + tz_from: UTC + tz_to: Asia/Tokyo + dt_format: "%Y-%m-%d %H:%M:%S%.f" + showtable: +``` + +The above configuration file defines the following sequence of operations: +1. Load a CSV file. +2. Filter rows where the `EventId` column contains the value `4624`. +3. Retrieve the first 5 rows. +4. Extract the `RecordNumber` and `TimeCreated` columns. +5. Convert the time zone of the `TimeCreated` column from `UTC` to `Asia/Tokyo`. +6. Display the processing results in a table format. + +#### Pipeline Operations +| Operation Type | Description | Parameters | +| -------------- | ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------- | +| process | Executes a series of operations on the dataset. | `steps`: A dict of operations (e.g., `load`, `select`, `dump`) to apply. | +| concat | Concatenates multiple datasets vertically or horizontally. | `sources`: List of stages to concat.
`params.how`: `vertical`, `vertical_relaxed`, `horizontal`, `diagonal`, `align`, etc. | +| join | Joins multiple datasets using keys. | `sources`: List of stages to join.
`params.key`: Column(s) used for joining.
`params.how`: `inner`, `left`, `right`, `full`, `semi`, `anti`, `cross`.
`params.coalesce`: bool | + +#### Sample YAML (`rules/test.yaml`): +```yaml +title: 'test' +description: 'test pipelines' +version: '0.1.0' +author: 'John Doe ' +stages: + load_stage: + type: process + steps: + load: + + stage_1: + type: process + source: load_stage + steps: + select: + colnames: + - TimeCreated + - PayloadData1 + + stage_2: + type: process + source: load_stage + steps: + select: + colnames: + - TimeCreated + - PayloadData2 + + merge_stage: + type: join + sources: + - stage_1 + - stage_2 + params: + how: full + key: TimeCreated + coalesce: True + + stage_3: + type: process + source: merge_stage + steps: + showtable: +``` + +#### Note: Step Duplication +Quilt supports YAML configurations with duplicate keys in steps. + +```yaml +stages: +test_stage: + steps: + load: + renamecol: # duplicate key + from: old_col1 + to: new_col1 + renamecol: # duplicate key + from: old_col2 + to: new_col2 + renamecol: # duplicate key + from: old_col3 + to: new_col3 + show: +``` + +Internally, these keys are handled as: + ```yaml -title: test -description: test filter -version: 0.1.0 -author: John Doe -rules: - load: - isin: - colname: EventId - values: - - 4624 - head: - number: 5 - select: - colnames: - - RecordNumber - - TimeCreated - changetz: - colname: TimeCreated - timezone_from: UTC - timezone_to: Asia/Tokyo - datetime_format: "%Y-%m-%d %H:%M:%S%.f" - showtable: -``` - -Note: While the standard YAML specification does not permit duplicate key names, Quilt rules allow for duplicate keys under the rules section. Specifically, even when multiple renamecol entries are listed, they are internally replaced and processed as renamecol, renamecol_, renamecol__, and so on. This approach enables each entry to be recognized and handled as distinct rules. - -## Planned Features: -- CSV cache (.pkl, duckdb, etc.) -- Logical condition-based filtering (e.g., OR, AND) for more complex queries. -- Grouping for operations like count -- Support for joining data with other tables. +renamecol +renamecol_ +renamecol__ +``` + +This ensures that each steps is treated as a distinct operation in the pipeline. + ## Installation ### from PyPI diff --git a/pyproject.toml b/pyproject.toml index 04b62d0..963e293 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "qsv" -version = "0.3.10" +version = "0.4.0" description = "A tool that provides elastic and rapid filtering for efficient analysis of huge CSV files, such as eventlogs." readme = "README.md" authors = [ diff --git a/src/qsv/__init__.py b/src/qsv/__init__.py index 2bace9b..15ba18e 100644 --- a/src/qsv/__init__.py +++ b/src/qsv/__init__.py @@ -1,19 +1,8 @@ -import logging import fire from qsv.controllers.DataFrameController import DataFrameController -logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s [%(levelname)s] %(message)s', - datefmt="%Y-%m-%dT%H:%M:%S%z", - handlers=[ - logging.StreamHandler() - ] -) - -# entrypoint def main(): fire.Fire(DataFrameController) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/src/qsv/controllers/DataFrameController.py b/src/qsv/controllers/DataFrameController.py index ff01e7e..02a8900 100644 --- a/src/qsv/controllers/DataFrameController.py +++ b/src/qsv/controllers/DataFrameController.py @@ -1,259 +1,91 @@ -import re -import sys -import logging from typing import Union -from datetime import datetime -from pathlib import Path -from qsv.controllers.CsvController import CsvController -from qsv.controllers.QuiltController import QuiltController -from qsv.views.TableView import TableView - -import polars as pl - - -logger = logging.getLogger(__name__) -logger.disabled = True +from qsv.operations.initializers import * +from qsv.operations.chainables import * +from qsv.operations.finalizers import * +from qsv.operations.quilters import * class DataFrameController(object): def __init__(self): self.df = None - - # -- private methods -- - def __check_exists_path(self, path: tuple[str]) -> None: - for p in path: - if not Path(p).exists(): - print(f"[Error] File \"{p}\" does not exist.") - sys.exit(1) - - def __check_exists_colnames(self, colnames: list[str]) -> None: - columns = self.df.collect_schema().names() - for colname in colnames: - if colname not in columns: - print(f"[Error] Column \"{colname}\" does not exist in the dataframe.") - sys.exit(1) - - # -- quilter -- - def quilt(self, config: str, *path: tuple[str], debug: bool = False) -> None: - """[quilter] Loads the specified quilt batch files.""" - logger.debug(f"config: {config}") - logger.debug(f"{len(path)} files are loaded. [{', '.join(path)}]") - q = QuiltController() - configs = q.load_configs(config) - q.print_configs(configs) - - for c in configs: - for k, v in c.get('rules').items(): - # for allow duplicated rulenames. - k = k if not k.endswith('_') else re.sub(r'_+$', '', k) - - if debug: - print(f"{k}: {v}") - - if k == 'load': - self.load(*path) - elif v: - getattr(self, k)(**v) - else: - getattr(self, k)() - - if debug: - print(self.df.collect()) - print() - # -- initializer -- + # -- initializers -- def load(self, *path: tuple[str], separator: str = ',', low_memory: bool = False): - """[initializer] Loads the specified CSV files.""" - logger.debug(f"{len(path)} files are loaded. [{', '.join(path)}]") - self.__check_exists_path(path) - self.df = CsvController(path=path).get_dataframe(separator=separator, low_memory=low_memory) + self.df = load(path=path, separator=separator, low_memory=low_memory) return self - # -- chainable -- + # -- chainables -- def select(self, colnames: Union[str, tuple[str]]): - """[chainable] Selects only the specified columns.""" - def parse_columns(headers: list[str], colnames: tuple[str]): - parsed_columns = list() - for col in colnames: - if '-' in col: - # parse 'startcol-endcol' string - flag_extract = False - start, end = col.split('-') - for h in headers: - if h == start: - flag_extract = True - if flag_extract: - parsed_columns.append(h) - if h == end: - flag_extract = False - else: - parsed_columns.append(col) - return parsed_columns - - # prevent type guessing - colnames: tuple[str] - if type(colnames) is list: - colnames = tuple(colnames) - elif type(colnames) is str: - colnames = (colnames, ) - self.__check_exists_colnames(colnames) - selected_columns = parse_columns(headers=self.df.collect_schema().names(), colnames=colnames) - logger.debug(f"{len(selected_columns)} columns are selected. {', '.join(selected_columns)}") - self.df = self.df.select(selected_columns) + self.df = select(df=self.df, colnames=colnames) return self def isin(self, colname: str, values: list): - """[chainable] Filters rows containing the specified values.""" - logger.debug(f"filter condition: {values} in {colname}") - self.__check_exists_colnames([colname]) - self.df = self.df.filter(pl.col(colname).is_in(values)) + self.df = isin(df=self.df, colname=colname, values=values) return self def contains(self, colname: str, pattern: str, ignorecase: bool = False): - """[chainable] Filters rows where the specified column matches the given regex.""" - logger.debug(f"filter condition: {pattern} contains {colname}") - self.__check_exists_colnames([colname]) - pattern = pattern if type(pattern) is str else str(pattern) - self.df = self.df.filter( - pl.col(colname).str.contains(f"(?i){pattern}") if ignorecase else pl.col(colname).str.contains(pattern) - ) + self.df = contains(df=self.df, colname=colname, pattern=pattern, ignorecase=ignorecase) return self def sed(self, colname: str, pattern: str, replacement: str, ignorecase: bool = False): - """[chainable] Replaces values using the specified regex.""" - logger.debug(f"sed condition: {pattern} on {colname}") - self.__check_exists_colnames([colname]) - pattern = pattern if type(pattern) is str else str(pattern) - self.df = self.df.with_columns( - pl.col(colname).cast(pl.String).str.replace(f"(?i){pattern}", replacement) if ignorecase else pl.col(colname).cast(pl.String).str.replace(pattern, replacement) - ) + self.df = sed(df=self.df, colname=colname, pattern=pattern, replacement=replacement, ignorecase=ignorecase) return self def grep(self, pattern: str, ignorecase: bool = False): - """[chainable] Treats all columns as strings and filters rows where any column matches the specified regex.""" - self.df = self.df.with_columns( - pl.concat_str( - [pl.col(colname).cast(pl.String).fill_null("") for colname in self.df.collect_schema().names()], - separator="," - ).alias('___combined') - ) - self.df = self.df.filter( - pl.col('___combined').str.contains(f"(?i){pattern}") if ignorecase else pl.col('___combined').str.contains(pattern) - ) - self.df = self.df.drop(['___combined']) + self.df = grep(df=self.df, pattern=pattern, ignorecase=ignorecase) return self def head(self, number: int = 5): - """[chainable] Selects only the first N lines.""" - logger.debug(f"heading {number} lines.") - self.df = self.df.head(number) + self.df = head(df=self.df, number=number) return self def tail(self, number: int = 5): - """[chainable] Selects only the last N lines.""" - logger.debug(f"tailing {number} lines.") - self.df = self.df.tail(number) + self.df = tail(df=self.df, number=number) return self def sort(self, colnames: Union[str, tuple[str], list[str]], desc: bool = False): - """[chainable] Sorts all rows by the specified column values.""" - logger.debug(f"sort by {colnames} ({'desc' if desc else 'asc'}).") - # prevent type guessing - colnames: tuple[str] - if type(colnames) is list: - colnames = tuple(colnames) - elif type(colnames) is str: - colnames = (colnames, ) - self.__check_exists_colnames(colnames) - self.df = self.df.sort(colnames, descending=desc) + self.df = sort(df=self.df, colnames=colnames, desc=desc) return self def uniq(self, colnames: Union[str, tuple[str], list[str]]): - """[chainable] Remove duplicate rows based on the specified column names.""" - logger.debug(f"unique by {colnames}.") - # prevent type guessing - colnames: tuple[str] - if type(colnames) is list: - colnames = tuple(colnames) - elif type(colnames) is str: - colnames = (colnames, ) - self.__check_exists_colnames(colnames) - self.df = self.df.unique(subset=colnames) + self.df = uniq(df=self.df, colnames=colnames) return self - def changetz( - self, - colname: str, - timezone_from: str = "UTC", - timezone_to: str = "UTC", - datetime_format: str = None - ): - """[chainable] Changes the timezone of the specified date column.""" - logger.debug(f"change {colname} timezone {timezone_from} to {timezone_to}.") - self.__check_exists_colnames([colname]) - - # convert string to datetime - if self.df.select(colname).collect_schema().dtypes()[0] != pl.Datetime: - if datetime_format: - self.df = self.df.with_columns(pl.col(colname).str.to_datetime(datetime_format)) - else: - self.df = self.df.with_columns(pl.col(colname).str.to_datetime()) - - # setup and change timezone - self.df = self.df.with_columns(pl.col(colname).dt.replace_time_zone(timezone_from)) - self.df = self.df.with_columns(pl.col(colname).dt.convert_time_zone(timezone_to)) + def changetz(self, colname: str, tz_from: str = "UTC", tz_to: str = "UTC", dt_format: str = None): + self.df = changetz(df=self.df, colname=colname, tz_from=tz_from, tz_to=tz_to, dt_format=dt_format) return self def renamecol(self, colname: str, new_colname: str): - """[chainable] Renames the specified column.""" - self.__check_exists_colnames([colname]) - self.df = self.df.rename({colname: new_colname}) + self.df = renamecol(df=self.df, colname=colname, new_colname=new_colname) + return self + + def drop(self): + self.df = None return self # -- finalizer -- def headers(self, plain: bool = False) -> None: - """[finalizer] Displays the column names of the data.""" - if plain: - print(",".join([f"\"{c}\"" for c in self.df.collect_schema().names()])) - else: - digits = len(str(len(self.df.collect_schema().names()))) - TableView.print( - headers=[f"{''.join([' ' for _ in range(0, digits-1)])}#", "Column Name"], - values=[[str(i).zfill(digits), c] for i, c in enumerate(self.df.collect_schema().names())] - ) + headers(df=self.df, plain=plain) def stats(self) -> None: - """[finalizer] Displays the statistical information of the data.""" - print(self.df.describe()) + stats(df=self.df) def showquery(self) -> None: - """[finalizer] Displays the data processing query.""" - print(self.df) + showquery(df=self.df) def show(self) -> None: - """[finalizer] Displays the processing results in a table format to standard output.""" - self.df.collect().write_csv(sys.stdout) + show(df=self.df) def showtable(self) -> None: - """[finalizer] Outputs the processing results table to the standard output.""" - print(self.df.collect()) + showtable(df=self.df) def dump(self, path: str = None) -> None: - """[finalizer] Outputs the processing results to a CSV file.""" - def autoname(): - now = datetime.now().strftime('%Y%m%d-%H%M%S') - query = self.df.explain(optimized=False).splitlines()[0] - temp = re.sub(r'[^\w\s]', '-', query) - temp = re.sub(r'-+', '-', temp) - temp = temp.strip('-') - temp = temp.replace(' ', '') - temp = temp.lower() - return f"{now}_{temp}.csv" + dump(df=self.df, path=path) - path = path if path else autoname() - self.df.collect().write_csv(path) - logger.info(f"csv dump successfully: {path}") + # -- quilter -- + def quilt(self, config: str, *path: tuple[str]) -> None: + quilt(dfc=self, config=config, path=path) def __str__(self): if self.df is not None: diff --git a/src/qsv/controllers/LogController.py b/src/qsv/controllers/LogController.py new file mode 100644 index 0000000..0f9b340 --- /dev/null +++ b/src/qsv/controllers/LogController.py @@ -0,0 +1,25 @@ +import logging + +class LogController(object): + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s [%(levelname)s] %(message)s', + datefmt="%Y-%m-%dT%H:%M:%S%z", + handlers=[ + logging.StreamHandler() + ] + ) + logger = logging.getLogger(__name__) + logger.disabled = True + + def info(msg: str) -> None: + LogController.logger.info(msg) + + def debug(msg: str) -> None: + LogController.logger.debug(msg) + + def warning(msg: str) -> None: + LogController.logger.warning(msg) + + def error(msg: str) -> None: + LogController.logger.error(msg) diff --git a/src/qsv/controllers/QuiltController.py b/src/qsv/controllers/QuiltController.py index f8c7cf9..e267a64 100644 --- a/src/qsv/controllers/QuiltController.py +++ b/src/qsv/controllers/QuiltController.py @@ -1,8 +1,6 @@ from pathlib import Path -import polars as pl - -from qsv.controllers.YamlController import YamlController from qsv.views.TableView import TableView +from qsv.controllers.YamlController import YamlController class QuiltController(object): def __init__(self): @@ -22,13 +20,23 @@ def load_configs(self, config) -> list[dict]: return configs def print_configs(self, configs: list[dict]): - print('Loaded Rules') + print(f"Loaded {len(configs)} Rules") + digits = len(str(len(configs))) TableView.print( - headers=['title', 'description', 'version', 'author'], - values=[[ - c.get('title'), - c.get('description'), - c.get('version'), - c.get('author') - ] for c in configs] + headers=[ + f"{''.join([' ' for _ in range(0, digits-1)])}#", + 'title', + 'description', + 'version', + 'author' + ], + values=[ + [ + str(i).zfill(digits), + c.get('title'), + c.get('description'), + c.get('version'), + c.get('author') + ] for i, c in enumerate(configs, 1) + ] ) diff --git a/src/qsv/operations/chainables/__init__.py b/src/qsv/operations/chainables/__init__.py new file mode 100644 index 0000000..fad1b32 --- /dev/null +++ b/src/qsv/operations/chainables/__init__.py @@ -0,0 +1,25 @@ +from qsv.operations.chainables.select import select +from qsv.operations.chainables.isin import isin +from qsv.operations.chainables.contains import contains +from qsv.operations.chainables.sed import sed +from qsv.operations.chainables.grep import grep +from qsv.operations.chainables.head import head +from qsv.operations.chainables.tail import tail +from qsv.operations.chainables.sort import sort +from qsv.operations.chainables.uniq import uniq +from qsv.operations.chainables.changetz import changetz +from qsv.operations.chainables.renamecol import renamecol + +__all__ = [ + 'select', + 'isin', + 'contains', + 'sed', + 'grep', + 'head', + 'tail', + 'sort', + 'uniq', + 'changetz', + 'renamecol', +] \ No newline at end of file diff --git a/src/qsv/operations/chainables/changetz.py b/src/qsv/operations/chainables/changetz.py new file mode 100644 index 0000000..5071abc --- /dev/null +++ b/src/qsv/operations/chainables/changetz.py @@ -0,0 +1,23 @@ +import sys +import polars as pl +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController + +def changetz(df: pl.LazyFrame, colname: str, tz_from: str = "UTC", tz_to: str = "UTC", dt_format: str = None) -> pl.LazyFrame: + """[chainable] Changes the timezone of the specified date column.""" + # convert string to datetime + if df.select(colname).collect_schema().dtypes()[0] != pl.Datetime: + if dt_format: + df = df.with_columns(pl.col(colname).str.to_datetime(dt_format)) + else: + df = df.with_columns(pl.col(colname).str.to_datetime()) + + if not exists_colname(df=df, colnames=[colname]): + sys.exit(1) + + LogController.debug(f"change {colname} timezone {tz_from} to {tz_to}.") + + # setup and change timezone + df = df.with_columns(pl.col(colname).dt.replace_time_zone(tz_from)) + df = df.with_columns(pl.col(colname).dt.convert_time_zone(tz_to)) + return df diff --git a/src/qsv/operations/chainables/contains.py b/src/qsv/operations/chainables/contains.py new file mode 100644 index 0000000..79ca186 --- /dev/null +++ b/src/qsv/operations/chainables/contains.py @@ -0,0 +1,15 @@ +import sys +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController +import polars as pl + +def contains(df: pl.LazyFrame, colname: str, pattern: str, ignorecase: bool = False) -> pl.LazyFrame: + """[chainable] Filters rows where the specified column matches the given regex.""" + if not exists_colname(df=df, colnames=[colname]): + sys.exit(1) + + LogController.debug(f"filter condition: {pattern} contains {colname}") + pattern = pattern if type(pattern) is str else str(pattern) + return df.filter( + pl.col(colname).str.contains(f"(?i){pattern}") if ignorecase else pl.col(colname).str.contains(pattern) + ) diff --git a/src/qsv/operations/chainables/grep.py b/src/qsv/operations/chainables/grep.py new file mode 100644 index 0000000..5ddb8e6 --- /dev/null +++ b/src/qsv/operations/chainables/grep.py @@ -0,0 +1,15 @@ +import polars as pl + +def grep(df: pl.LazyFrame, pattern: str, ignorecase: bool = False) -> pl.LazyFrame: + """[chainable] Treats all columns as strings and filters rows where any column matches the specified regex.""" + df = df.with_columns( + pl.concat_str( + [pl.col(colname).cast(pl.String).fill_null("") for colname in df.collect_schema().names()], + separator="," + ).alias('___combined') + ) + df = df.filter( + pl.col('___combined').str.contains(f"(?i){pattern}") if ignorecase else pl.col('___combined').str.contains(pattern) + ) + df = df.drop(['___combined']) + return df diff --git a/src/qsv/operations/chainables/head.py b/src/qsv/operations/chainables/head.py new file mode 100644 index 0000000..a6f0ff9 --- /dev/null +++ b/src/qsv/operations/chainables/head.py @@ -0,0 +1,7 @@ +import polars as pl +from qsv.controllers.LogController import LogController + +def head(df: pl.LazyFrame, number: int = 5) -> pl.LazyFrame: + """[chainable] Selects only the first N lines.""" + LogController.debug(f"heading {number} lines.") + return df.head(number) diff --git a/src/qsv/operations/chainables/isin.py b/src/qsv/operations/chainables/isin.py new file mode 100644 index 0000000..5c9e8ef --- /dev/null +++ b/src/qsv/operations/chainables/isin.py @@ -0,0 +1,12 @@ +import sys +import polars as pl +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController + +def isin(df: pl.LazyFrame, colname: str, values: list) -> pl.LazyFrame: + """[chainable] Filters rows containing the specified values.""" + LogController.debug(f"filter condition: {values} in {colname}") + if not exists_colname(df=df, colnames=[colname]): + sys.exit(1) + + return df.filter(pl.col(colname).is_in(values)) diff --git a/src/qsv/operations/chainables/renamecol.py b/src/qsv/operations/chainables/renamecol.py new file mode 100644 index 0000000..67bce0d --- /dev/null +++ b/src/qsv/operations/chainables/renamecol.py @@ -0,0 +1,10 @@ +import sys +from qsv.utils.DataFrameUtils import exists_colname +import polars as pl + +def renamecol(df: pl.LazyFrame, colname: str, new_colname: str) -> pl.LazyFrame: + """[chainable] Renames the specified column.""" + if not exists_colname(df=df, colnames=[colname]): + sys.exit(1) + + return df.rename({colname: new_colname}) diff --git a/src/qsv/operations/chainables/sed.py b/src/qsv/operations/chainables/sed.py new file mode 100644 index 0000000..ac74a25 --- /dev/null +++ b/src/qsv/operations/chainables/sed.py @@ -0,0 +1,15 @@ +import sys +import polars as pl +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController + +def sed(df: pl.LazyFrame, colname: str, pattern: str, replacement: str, ignorecase: bool = False) -> pl.LazyFrame: + """[chainable] Replaces values using the specified regex.""" + if not exists_colname(df=df, colnames=[colname]): + sys.exit(1) + + LogController.debug(f"sed condition: {pattern} on {colname}") + pattern = pattern if type(pattern) is str else str(pattern) + return df.with_columns( + pl.col(colname).cast(pl.String).str.replace(f"(?i){pattern}", replacement) if ignorecase else pl.col(colname).cast(pl.String).str.replace(pattern, replacement) + ) diff --git a/src/qsv/operations/chainables/select.py b/src/qsv/operations/chainables/select.py new file mode 100644 index 0000000..a7f8790 --- /dev/null +++ b/src/qsv/operations/chainables/select.py @@ -0,0 +1,41 @@ +import sys +from typing import Union +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController +import polars as pl + +def parse_columns(headers: list[str], colnames: tuple[str]) -> list[str]: + parsed_columns = list() + for col in colnames: + if '-' in col: + # parse 'startcol-endcol' string + flag_extract = False + start, end = col.split('-') + for h in headers: + if h == start: + flag_extract = True + if flag_extract: + parsed_columns.append(h) + if h == end: + flag_extract = False + else: + parsed_columns.append(col) + return parsed_columns + + +def select(df: pl.LazyFrame, colnames: Union[str, tuple[str]]) -> pl.LazyFrame: + """[chainable] Selects only the specified columns.""" + + # prevent type guessing + colnames: tuple[str] + if type(colnames) is list: + colnames = tuple(colnames) + elif type(colnames) is str: + colnames = (colnames, ) + + if not exists_colname(df=df, colnames=colnames): + sys.exit(1) + + selected_columns = parse_columns(headers=df.collect_schema().names(), colnames=colnames) + LogController.debug(msg=f"{len(selected_columns)} columns are selected. {', '.join(selected_columns)}") + return df.select(selected_columns) diff --git a/src/qsv/operations/chainables/sort.py b/src/qsv/operations/chainables/sort.py new file mode 100644 index 0000000..5015e75 --- /dev/null +++ b/src/qsv/operations/chainables/sort.py @@ -0,0 +1,20 @@ +import sys +import polars as pl +from typing import Union +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController + +def sort(df: pl.LazyFrame, colnames: Union[str, tuple[str], list[str]], desc: bool = False) -> pl.LazyFrame: + """[chainable] Sorts all rows by the specified column values.""" + # prevent type guessing + colnames: tuple[str] + if type(colnames) is list: + colnames = tuple(colnames) + elif type(colnames) is str: + colnames = (colnames, ) + + if not exists_colname(df=df, colnames=colnames): + sys.exit(1) + + LogController.debug(f"sort by {colnames} ({'desc' if desc else 'asc'}).") + return df.sort(colnames, descending=desc) diff --git a/src/qsv/operations/chainables/tail.py b/src/qsv/operations/chainables/tail.py new file mode 100644 index 0000000..da9e77f --- /dev/null +++ b/src/qsv/operations/chainables/tail.py @@ -0,0 +1,7 @@ +import polars as pl +from qsv.controllers.LogController import LogController + +def tail(df: pl.LazyFrame, number: int = 5) -> pl.LazyFrame: + """[chainable] Selects only the last N lines.""" + LogController.debug(f"tailing {number} lines.") + return df.tail(number) diff --git a/src/qsv/operations/chainables/uniq.py b/src/qsv/operations/chainables/uniq.py new file mode 100644 index 0000000..3bfd2f7 --- /dev/null +++ b/src/qsv/operations/chainables/uniq.py @@ -0,0 +1,20 @@ +import sys +import polars as pl +from typing import Union +from qsv.utils.DataFrameUtils import exists_colname +from qsv.controllers.LogController import LogController + +def uniq(df: pl.LazyFrame, colnames: Union[str, tuple[str], list[str]]) -> pl.LazyFrame: + """[chainable] Remove duplicate rows based on the specified column names.""" + # prevent type guessing + colnames: tuple[str] + if type(colnames) is list: + colnames = tuple(colnames) + elif type(colnames) is str: + colnames = (colnames, ) + + if not exists_colname(df=df, colnames=colnames): + sys.exit(1) + + LogController.debug(f"unique by {colnames}.") + return df.unique(subset=colnames) diff --git a/src/qsv/operations/finalizers/__init__.py b/src/qsv/operations/finalizers/__init__.py new file mode 100644 index 0000000..3367cfc --- /dev/null +++ b/src/qsv/operations/finalizers/__init__.py @@ -0,0 +1,15 @@ +from qsv.operations.finalizers.headers import headers +from qsv.operations.finalizers.stats import stats +from qsv.operations.finalizers.showquery import showquery +from qsv.operations.finalizers.show import show +from qsv.operations.finalizers.showtable import showtable +from qsv.operations.finalizers.dump import dump + +__all__ = [ + 'headers', + 'stats', + 'showquery', + 'show', + 'showtable', + 'dump', +] \ No newline at end of file diff --git a/src/qsv/operations/finalizers/dump.py b/src/qsv/operations/finalizers/dump.py new file mode 100644 index 0000000..8a310b5 --- /dev/null +++ b/src/qsv/operations/finalizers/dump.py @@ -0,0 +1,21 @@ +import re +from datetime import datetime +from qsv.controllers.LogController import LogController +import polars as pl + +def autoname(df: pl.LazyFrame) -> str: + now = datetime.now().strftime('%Y%m%d-%H%M%S') + query = df.explain(optimized=False).splitlines()[0] + temp = re.sub(r'[^\w\s]', '-', query) + temp = re.sub(r'-+', '-', temp) + temp = temp.strip('-') + temp = temp.replace(' ', '') + temp = temp.lower() + return f"{now}_{temp}.csv" + + +def dump(df: pl.LazyFrame, path: str = None) -> None: + """[finalizer] Outputs the processing results to a CSV file.""" + path = path if path else autoname(df=df) + df.collect().write_csv(path) + LogController.info(f"csv dump successfully: {path}") diff --git a/src/qsv/operations/finalizers/headers.py b/src/qsv/operations/finalizers/headers.py new file mode 100644 index 0000000..c46712c --- /dev/null +++ b/src/qsv/operations/finalizers/headers.py @@ -0,0 +1,13 @@ +from qsv.views.TableView import TableView +import polars as pl + +def headers(df=pl.LazyFrame, plain: bool = False) -> None: + """[finalizer] Displays the column names of the data.""" + if plain: + print(",".join([f"\"{c}\"" for c in df.collect_schema().names()])) + else: + digits = len(str(len(df.collect_schema().names()))) + TableView.print( + headers=[f"{''.join([' ' for _ in range(0, digits-1)])}#", "Column Name"], + values=[[str(i).zfill(digits), c] for i, c in enumerate(df.collect_schema().names())] + ) diff --git a/src/qsv/operations/finalizers/show.py b/src/qsv/operations/finalizers/show.py new file mode 100644 index 0000000..d2570b2 --- /dev/null +++ b/src/qsv/operations/finalizers/show.py @@ -0,0 +1,6 @@ +import sys +import polars as pl + +def show(df: pl.LazyFrame) -> None: + """[finalizer] Displays the processing results in a table format to standard output.""" + df.collect().write_csv(sys.stdout) diff --git a/src/qsv/operations/finalizers/showquery.py b/src/qsv/operations/finalizers/showquery.py new file mode 100644 index 0000000..208e2ea --- /dev/null +++ b/src/qsv/operations/finalizers/showquery.py @@ -0,0 +1,5 @@ +import polars as pl + +def showquery(df: pl.LazyFrame) -> None: + """[finalizer] Displays the data processing query.""" + print(df) diff --git a/src/qsv/operations/finalizers/showtable.py b/src/qsv/operations/finalizers/showtable.py new file mode 100644 index 0000000..6e9f764 --- /dev/null +++ b/src/qsv/operations/finalizers/showtable.py @@ -0,0 +1,5 @@ +import polars as pl + +def showtable(df: pl.LazyFrame) -> None: + """[finalizer] Outputs the processing results table to the standard output.""" + print(df.collect()) diff --git a/src/qsv/operations/finalizers/stats.py b/src/qsv/operations/finalizers/stats.py new file mode 100644 index 0000000..0ebc98e --- /dev/null +++ b/src/qsv/operations/finalizers/stats.py @@ -0,0 +1,5 @@ +import polars as pl + +def stats(df: pl.LazyFrame) -> None: + """[finalizer] Displays the statistical information of the data.""" + print(df.describe()) diff --git a/src/qsv/operations/initializers/__init__.py b/src/qsv/operations/initializers/__init__.py new file mode 100644 index 0000000..e26696e --- /dev/null +++ b/src/qsv/operations/initializers/__init__.py @@ -0,0 +1,5 @@ +from qsv.operations.initializers.load import load + +__all__ = [ + 'load', +] \ No newline at end of file diff --git a/src/qsv/operations/initializers/load.py b/src/qsv/operations/initializers/load.py new file mode 100644 index 0000000..b8c4ed5 --- /dev/null +++ b/src/qsv/operations/initializers/load.py @@ -0,0 +1,12 @@ +import sys +from qsv.utils.FileUtils import exists_path +from qsv.controllers.CsvController import CsvController +from qsv.controllers.LogController import LogController + +def load(path: tuple[str], separator: str = ',', low_memory: bool = False): + """[initializer] Loads the specified CSV files.""" + if not exists_path(path=path): + sys.exit(1) + + LogController.debug(msg=f"{len(path)} files are loaded. [{', '.join(path)}]") + return CsvController(path=path).get_dataframe(separator=separator, low_memory=low_memory) diff --git a/src/qsv/operations/quilters/__init__.py b/src/qsv/operations/quilters/__init__.py new file mode 100644 index 0000000..322babd --- /dev/null +++ b/src/qsv/operations/quilters/__init__.py @@ -0,0 +1,5 @@ +from qsv.operations.quilters.quilt import quilt + +__all__ = [ + 'quilt', +] \ No newline at end of file diff --git a/src/qsv/operations/quilters/quilt.py b/src/qsv/operations/quilters/quilt.py new file mode 100644 index 0000000..6851e88 --- /dev/null +++ b/src/qsv/operations/quilters/quilt.py @@ -0,0 +1,100 @@ +import re +from typing import Union, Optional, Literal +from qsv.controllers.LogController import LogController +from qsv.controllers.QuiltController import QuiltController +import polars as pl + +def process(dfc, steps: dict, path: tuple[str], source: Optional[pl.LazyFrame] = None) -> pl.LazyFrame: + default_path = path + + # drop dataframe before each stage + dfc.drop() + + # pipeline + if source is not None: + dfc.df = source + + for k, v in steps: + # for allow duplicated rulenames. + k = k if not k.endswith('_') else re.sub(r'_+$', '', k) + + if k == 'load': + dfc.load(*path) + elif v: + getattr(dfc, k)(**v) + else: + getattr(dfc, k)() + + return dfc.df + + +def concat( + df_list: list[pl.LazyFrame], + how: Literal['vertical', 'vertical_relaxed', 'horizontal', 'diagonal', 'align'] + ) -> pl.LazyFrame: + return pl.concat(df_list, how=how) + + +def join( + df_list: list[pl.LazyFrame], + how: Literal['inner', 'left', 'right', 'full', 'semi', 'anti', 'cross'], + key: Union[str, list[str]], + coalesce: bool + ) -> pl.LazyFrame: + return df_list[0].join(df_list[1], how=how, on=key, coalesce=coalesce) + + +def quilt(dfc, config: str, path: tuple[str]) -> None: + """[quilter] Loads the specified quilt batch files.""" + LogController.debug(f"config: {config}") + LogController.debug(f"{len(path)} files are loaded. [{', '.join(path)}]") + + q = QuiltController() + configs = q.load_configs(config) + q.print_configs(configs) + + # per config file + for config in configs: + # per stage + df_dict = dict() + + for stage_key, stage_values in config.get('stages').items(): + stage_type = stage_values.get('type') + + if stage_type == "process": + if 'source' in stage_values: + # pipeline + df_dict[stage_key] = process( + dfc=dfc, + steps=stage_values.get('steps').items(), + path=path, + source=df_dict[stage_values.get('source')], + ) + else: + # load from file + df_dict[stage_key] = process( + dfc=dfc, + steps=stage_values.get('steps').items(), + path=path + ) + + elif stage_type == "concat": + sources = stage_values.get('sources') + how = stage_values.get('params', dict()).get('how', 'vertical') + df_dict[stage_key] = concat( + df_list=[df_dict.get(source) for source in sources], + how=how + ) + + elif stage_type == "join": + sources = stage_values.get('sources') + how = stage_values.get('params', dict()).get('how', 'full') + key = stage_values.get('params', dict()).get('key') + coalesce = stage_values.get('params', dict()).get('coalesce', True) + + df_dict[stage_key] = join( + df_list=[df_dict.get(source) for source in sources], + how=how, + key=key, + coalesce=coalesce + ) diff --git a/src/qsv/utils/DataFrameUtils.py b/src/qsv/utils/DataFrameUtils.py new file mode 100644 index 0000000..5494f22 --- /dev/null +++ b/src/qsv/utils/DataFrameUtils.py @@ -0,0 +1,11 @@ +import polars as pl +from qsv.controllers.LogController import LogController + +def exists_colname(df: pl.LazyFrame, colnames: list[str]) -> bool: + columns = df.collect_schema().names() + for colname in colnames: + if colname not in columns: + LogController.error(f"Column \"{colname}\" does not exist in the dataframe.") + return False + else: + return True diff --git a/src/qsv/utils/FileUtils.py b/src/qsv/utils/FileUtils.py new file mode 100644 index 0000000..ff92e25 --- /dev/null +++ b/src/qsv/utils/FileUtils.py @@ -0,0 +1,12 @@ +import sys +from pathlib import Path + +from qsv.controllers.LogController import LogController + +def exists_path(path: tuple[str]) -> bool: + for p in path: + if not Path(p).exists(): + LogController.error(f"[Error] File \"{p}\" does not exist.") + return False + else: + return True diff --git a/uv.lock b/uv.lock index 8ea07f7..991e0f4 100644 --- a/uv.lock +++ b/uv.lock @@ -216,7 +216,7 @@ wheels = [ [[package]] name = "qsv" -version = "0.3.10" +version = "0.4.0" source = { editable = "." } dependencies = [ { name = "fire" },