Skip to content

Commit

Permalink
Fixed parsing error when loading checks from a file (#165)
Browse files Browse the repository at this point in the history
## Changes
Fixing SQL Expression Parsing Error. 
Refactored tests to avoid code duplication and increase maintenance.

### Linked issues

Resolves #162 

### Tests

- [x] manually tested
- [x] added unit tests
- [x] added integration tests
  • Loading branch information
mwojtyczka authored Feb 12, 2025
1 parent 3ca34dc commit 17613f3
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 146 deletions.
13 changes: 13 additions & 0 deletions docs/dqx/docs/guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ print(dlt_expectations)
You can optionally install DQX in the workspace, see the [Installation Guide](/docs/installation#dqx-installation-in-a-databricks-workspace).
As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below.

DQX operates at the moment exclusively at the pySpark dataframe level and does not interact directly with databases or storage systems.
DQX does not persist data after performing quality checks, meaning users must handle data storage themselves.
Since DQX does not manage the input location, output table, or quarantine table, it is the user's responsibility to store or persist the processed data as needed.

Open the config to check available run configs and adjust the settings if needed:
```commandline
databricks labs dqx open-remote-config
Expand Down Expand Up @@ -164,6 +168,11 @@ Fields:

### Loading and execution methods

Checks can be loaded from a file in the installation folder, workspace, or local file system. If the checks file contains invalid json or yaml syntax, the engine will raise an error.
The checks can be applied using `apply_checks_by_metadata_and_split` or `apply_checks_by_metadata` methods. The checks are validated automatically as part of these methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_by_metadata_and_split`.
If you want to report issues as additional columns, use `apply_checks_by_metadata`.

#### Method 1: Loading checks from a workspace file in the installation folder

If DQX is installed in the workspace, you can load checks based on the run configuration:
Expand Down Expand Up @@ -229,6 +238,10 @@ valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)

### Quality rules defined as code

Check can be defined in the code and applied using `apply_checks_and_split` or `apply_checks` methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use `apply_checks_and_split`.
If you want to report issues as additional columns, use `apply_checks`.

#### Method 1: Using DQX classes

```python
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ argument-naming-style = "snake_case"

# Regular expression matching correct argument names. Overrides argument-naming-
# style. If left empty, argument names will be checked with the set naming style.
argument-rgx = "[a-z_][a-z0-9_]{2,30}$"
argument-rgx = "[a-z_][a-z0-9_]{2,40}$"

# Naming style matching correct attribute names.
attr-naming-style = "snake_case"
Expand Down
8 changes: 4 additions & 4 deletions src/databricks/labs/dqx/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,22 @@ def get_valid(self, df: DataFrame) -> DataFrame:

@staticmethod
@abc.abstractmethod
def load_checks_from_local_file(path: str) -> list[dict]:
def load_checks_from_local_file(filepath: str) -> list[dict]:
"""
Load checks (dq rules) from a file (json or yml) in the local file system.
This does not require installation of DQX in the workspace.
The returning checks can be used as input for `apply_checks_by_metadata` function.
:param path: path to a file containing the checks.
:param filepath: path to a file containing the checks.
:return: list of dq rules
"""

@staticmethod
@abc.abstractmethod
def save_checks_in_local_file(checks: list[dict], path: str):
def save_checks_in_local_file(checks: list[dict], filepath: str):
"""
Save checks (dq rules) to yml file in the local file system.
:param checks: list of dq rules to save
:param path: path to a file containing the checks.
:param filepath: path to a file containing the checks.
"""
114 changes: 98 additions & 16 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,27 +116,27 @@ def get_valid(self, df: DataFrame) -> DataFrame:
)

@staticmethod
def load_checks_from_local_file(path: str) -> list[dict]:
if not path:
raise ValueError("filename must be provided")
def load_checks_from_local_file(filepath: str) -> list[dict]:
if not filepath:
raise ValueError("filepath must be provided")

try:
checks = Installation.load_local(list[dict[str, str]], Path(path))
checks = Installation.load_local(list[dict[str, str]], Path(filepath))
return deserialize_dicts(checks)
except FileNotFoundError:
msg = f"Checks file {path} missing"
msg = f"Checks file {filepath} missing"
raise FileNotFoundError(msg) from None

@staticmethod
def save_checks_in_local_file(checks: list[dict], path: str):
if not path:
raise ValueError("filename must be provided")
def save_checks_in_local_file(checks: list[dict], filepath: str):
if not filepath:
raise ValueError("filepath must be provided")

try:
with open(path, 'w', encoding="utf-8") as file:
with open(filepath, 'w', encoding="utf-8") as file:
yaml.safe_dump(checks, file)
except FileNotFoundError:
msg = f"Checks file {path} missing"
msg = f"Checks file {filepath} missing"
raise FileNotFoundError(msg) from None

@staticmethod
Expand Down Expand Up @@ -391,49 +391,128 @@ def __init__(
self._engine = engine or DQEngineCore(workspace_client, extra_params)

def apply_checks(self, df: DataFrame, checks: list[DQRule]) -> DataFrame:
"""Applies data quality checks to a given dataframe.
:param df: dataframe to check
:param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
:return: dataframe with errors and warning reporting columns
"""
return self._engine.apply_checks(df, checks)

def apply_checks_and_split(self, df: DataFrame, checks: list[DQRule]) -> tuple[DataFrame, DataFrame]:
"""Applies data quality checks to a given dataframe and split it into two ("good" and "bad"),
according to the data quality checks.
:param df: dataframe to check
:param checks: list of checks to apply to the dataframe. Each check is an instance of DQRule class.
:return: two dataframes - "good" which includes warning rows but no reporting columns, and "data" having
error and warning rows and corresponding reporting columns
"""
return self._engine.apply_checks_and_split(df, checks)

def apply_checks_by_metadata_and_split(
self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None
) -> tuple[DataFrame, DataFrame]:
"""Wrapper around `apply_checks_and_split` for use in the metadata-driven pipelines. The main difference
is how the checks are specified - instead of using functions directly, they are described as function name plus
arguments.
:param df: dataframe to check
:param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
* `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
it will be used as an error/warning message, or `null` if it's evaluated to `false`
* `name` - name that will be given to a resulting column. Autogenerated if not provided
* `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
and `warn` (data is going into both dataframes)
:param glbs: dictionary with functions mapping (eg. ``globals()`` of the calling module).
If not specified, then only built-in functions are used for the checks.
:return: two dataframes - "good" which includes warning rows but no reporting columns, and "bad" having
error and warning rows and corresponding reporting columns
"""
return self._engine.apply_checks_by_metadata_and_split(df, checks, glbs)

def apply_checks_by_metadata(
self, df: DataFrame, checks: list[dict], glbs: dict[str, Any] | None = None
) -> DataFrame:
"""Wrapper around `apply_checks` for use in the metadata-driven pipelines. The main difference
is how the checks are specified - instead of using functions directly, they are described as function name plus
arguments.
:param df: dataframe to check
:param checks: list of dictionaries describing checks. Each check is a dictionary consisting of following fields:
* `check` - Column expression to evaluate. This expression should return string value if it's evaluated to true -
it will be used as an error/warning message, or `null` if it's evaluated to `false`
* `name` - name that will be given to a resulting column. Autogenerated if not provided
* `criticality` (optional) - possible values are `error` (data going only into "bad" dataframe),
and `warn` (data is going into both dataframes)
:param glbs: dictionary with functions mapping (eg. ``globals()`` of calling module).
If not specified, then only built-in functions are used for the checks.
:return: dataframe with errors and warning reporting columns
"""
return self._engine.apply_checks_by_metadata(df, checks, glbs)

@staticmethod
def validate_checks(checks: list[dict], glbs: dict[str, Any] | None = None) -> ChecksValidationStatus:
"""
Validate the input dict to ensure they conform to expected structure and types.
Each check can be a dictionary. The function validates
the presence of required keys, the existence and callability of functions, and the types
of arguments passed to these functions.
:param checks: List of checks to apply to the dataframe. Each check should be a dictionary.
:param glbs: Optional dictionary of global functions that can be used in checks.
:return ValidationStatus: The validation status.
"""
return DQEngineCore.validate_checks(checks, glbs)

def get_invalid(self, df: DataFrame) -> DataFrame:
"""
Get records that violate data quality checks (records with warnings and errors).
@param df: input DataFrame.
@return: dataframe with error and warning rows and corresponding reporting columns.
"""
return self._engine.get_invalid(df)

def get_valid(self, df: DataFrame) -> DataFrame:
"""
Get records that don't violate data quality checks (records with warnings but no errors).
@param df: input DataFrame.
@return: dataframe with warning rows but no reporting columns.
"""
return self._engine.get_valid(df)

@staticmethod
def load_checks_from_local_file(path: str) -> list[dict]:
return DQEngineCore.load_checks_from_local_file(path)
def load_checks_from_local_file(filepath: str) -> list[dict]:
"""
Load checks (dq rules) from a file (json or yml) in the local filesystem.
:param filepath: path to the file containing the checks.
:return: list of dq rules or raise an error if checks file is missing or is invalid.
"""
parsed_checks = DQEngineCore.load_checks_from_local_file(filepath)
if not parsed_checks:
raise ValueError(f"Invalid or no checks in file: {filepath}")
return parsed_checks

def load_checks_from_workspace_file(self, workspace_path: str) -> list[dict]:
"""Load checks (dq rules) from a file (json or yml) in the workspace.
This does not require installation of DQX in the workspace.
The returning checks can be used as input for `apply_checks_by_metadata` function.
:param workspace_path: path to the file in the workspace.
:return: list of dq rules.
:return: list of dq rules or raise an error if checks file is missing or is invalid.
"""
workspace_dir = os.path.dirname(workspace_path)
filename = os.path.basename(workspace_path)
installation = Installation(self.ws, "dqx", install_folder=workspace_dir)

logger.info(f"Loading quality rules (checks) from {workspace_path} in the workspace.")
return self._load_checks_from_file(installation, filename)
parsed_checks = self._load_checks_from_file(installation, filename)
if not parsed_checks:
raise ValueError(f"Invalid or no checks in workspace file: {workspace_path}")
return parsed_checks

def load_checks_from_installation(
self, run_config_name: str | None = "default", product_name: str = "dqx", assume_user: bool = True
Expand All @@ -445,14 +524,17 @@ def load_checks_from_installation(
:param run_config_name: name of the run (config) to use
:param product_name: name of the product/installation directory
:param assume_user: if True, assume user installation
:return: list of dq rules
:return: list of dq rules or raise an error if checks file is missing or is invalid.
"""
installation = self._get_installation(assume_user, product_name)
run_config = self._load_run_config(installation, run_config_name)
filename = run_config.checks_file or "checks.yml"

logger.info(f"Loading quality rules (checks) from {installation.install_folder()}/{filename} in the workspace.")
return self._load_checks_from_file(installation, filename)
parsed_checks = self._load_checks_from_file(installation, filename)
if not parsed_checks:
raise ValueError(f"Invalid or no checks in workspace file: {installation.install_folder()}/{filename}")
return parsed_checks

@staticmethod
def save_checks_in_local_file(checks: list[dict], path: str):
Expand Down
23 changes: 16 additions & 7 deletions src/databricks/labs/dqx/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
import yaml
import ast
from pyspark.sql import Column
from pyspark.sql import SparkSession

Expand Down Expand Up @@ -49,12 +49,21 @@ def read_input_data(spark: SparkSession, input_location: str | None, input_forma

def deserialize_dicts(checks: list[dict[str, str]]) -> list[dict]:
"""
deserialize string fields instances containing dictionaries
Deserialize string fields instances containing dictionaries.
This is needed as nested dictionaries from installation files are loaded as strings.
@param checks: list of checks
@return:
"""
for item in checks:
for key, value in item.items():
if value.startswith("{") and value.endswith("}"):
item[key] = yaml.safe_load(value.replace("'", '"'))
return checks

def parse_nested_fields(obj):
"""Recursively parse all string representations of dictionaries."""
if isinstance(obj, str):
if obj.startswith("{") and obj.endswith("}"):
parsed_obj = ast.literal_eval(obj)
return parse_nested_fields(parsed_obj)
return obj
if isinstance(obj, dict):
return {k: parse_nested_fields(v) for k, v in obj.items()}
return obj

return [parse_nested_fields(check) for check in checks]
Empty file added tests/__init__.py
Empty file.
Loading

0 comments on commit 17613f3

Please sign in to comment.