Skip to content

Commit

Permalink
fix: misidentify column name as lateral alias (#540)
Browse files Browse the repository at this point in the history
* fix: misidentify-column-name-as-alias (#539)

* add LATERAL_COLUMN_ALIAS_REFERENCE in SQLLineageConfig

* adjust import order

* add test_column_top_level_enable_lateral_ref_with_metadata_from_nested_subquery

* unknown

* refactor: rebase master and convert LATERAL_COLUMN_ALIAS_REFERENCE to bool type

* refactor: use as few condition as possible: SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE

* refactor: rebase master and resolve conflict

* refactor: move logic from to_source_columns to end_of_query_cleanup

* refactor: rebase master and fix black format

* docs: LATERAL_COLUMN_ALIAS_REFERENCE how-to guide

* docs: starting version for each config

---------

Co-authored-by: reata <reddevil.hjw@gmail.com>
  • Loading branch information
maoxingda and reata authored Jan 29, 2024
1 parent a93b894 commit d0ca3dc
Show file tree
Hide file tree
Showing 9 changed files with 540 additions and 62 deletions.
54 changes: 47 additions & 7 deletions docs/gear_up/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,65 @@ default schema name.

Default: ``""``

Since: 1.5.0

DIRECTORY
=========
Frontend app SQL directory. By default the frontend app is showing the data directory packaged with sqllineage,
which includes tpcds queries for demo purposes. User can customize with this key.

Default: ``sqllineage/data``

Since: 1.2.1

LATERAL_COLUMN_ALIAS_REFERENCE
==============================
Enable lateral column alias reference. This is a syntax feature supported by some SQL dialects. See:

- Amazon Redshift: `Amazon Redshift announces support for lateral column alias reference`_
- Spark (since 3.4): `Support "lateral column alias references" to allow column aliases to be used within SELECT clauses`_
- Databricks: `Introducing the Support of Lateral Column Alias`_

.. note::
Lateral column alias reference is a feature that must be used together with metadata for each column to be
correctly resolved. Take below example:

.. code-block:: sql
SELECT clicks / impressions as probability,
round(100 * probability, 1) as percentage
FROM raw_data
If table raw_data has a column named **probability**, **probability** in the second selected column is from table
raw_data. Otherwise, it's referencing alias **clicks / impressions as probability**.

That means with SQLLineage, besides making LATERAL_COLUMN_ALIAS_REFERENCE=TRUE, MetaDataProvider must also be
provided so we can query raw_data table to see if it has a column named **probability** and then check alias reference.
If not provided, we will fallback to default behavior to simply assume column **probability** is from table raw_data
even if LATERAL_COLUMN_ALIAS_REFERENCE is set to TRUE.

Default: ``False``

Since: 1.5.1

TSQL_NO_SEMICOLON
=================
Enable tsql no semicolon splitter mode.
Enable tsql no semicolon splitter mode. Transact-SQL offers this feature that even when SQL statements are not delimited
by semicolon, it can still be parsed and executed.

.. warning::
Transact-SQL offers this feature that even when SQL statements are not delimited by semicolon, it can still be
parsed and executed. But quoting `tsql_syntax_convention`_, "although the semicolon isn't required for most
statements in this version (v16) of SQL Server, it will be required in a future version". So this config key is
kept mostly for backward-compatible purposes. We may drop the support any time without warning. Bear this in mind
when using this feature with sqllineage.
Quoting `Transact-SQL syntax conventions (Transact-SQL)`_, "although the semicolon isn't required for most
statements in this version (v16) of SQL Server, it will be required in a future version".

So with SQLLineage, this config key is kept mostly for backward-compatible purposes. We may drop the support any
time without warning. Bear this in mind when using this feature.

Default: ``False``

Since: 1.4.8


.. _tsql_syntax_convention: https://learn.microsoft.com/en-us/sql/t-sql/language-elements/transact-sql-syntax-conventions-transact-sql?view=sql-server-ver16
.. _Amazon Redshift announces support for lateral column alias reference: https://aws.amazon.com/about-aws/whats-new/2018/08/amazon-redshift-announces-support-for-lateral-column-alias-reference/
.. _Support "lateral column alias references" to allow column aliases to be used within SELECT clauses: https://issues.apache.org/jira/browse/SPARK-27561
.. _Introducing the Support of Lateral Column Alias: https://www.databricks.com/blog/introducing-support-lateral-column-alias
.. _Transact-SQL syntax conventions (Transact-SQL): https://learn.microsoft.com/en-us/sql/t-sql/language-elements/transact-sql-syntax-conventions-transact-sql?view=sql-server-ver16
2 changes: 2 additions & 0 deletions sqllineage/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class _SQLLineageConfigLoader:
"DEFAULT_SCHEMA": (str, ""),
# to enable tsql no semicolon splitter mode
"TSQL_NO_SEMICOLON": (bool, False),
# lateral column alias reference supported by some dialect (redshift, spark 3.4+, etc)
"LATERAL_COLUMN_ALIAS_REFERENCE": (bool, False),
}
BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1")

Expand Down
6 changes: 4 additions & 2 deletions sqllineage/core/holders.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def _build_digraph(
for unresolved_col, tgt_col in unresolved_cols:
# check if there's only one parent candidate contains the column with same name
src_cols = []
# check if source column exists in graph
# check if source column exists in graph (either from subquery or from table created in prev statement)
for parent in unresolved_col.parent_candidates:
src_col = Column(unresolved_col.raw_name)
src_col.parent = parent
Expand All @@ -442,7 +442,9 @@ def _build_digraph(
# It incorrect for JOIN with ON, but sql without specifying an alias in this case will be invalid
for src_col in src_cols:
g.add_edge(src_col, tgt_col, type=EdgeType.LINEAGE)
g.remove_edge(unresolved_col, tgt_col)
if len(src_cols) > 0:
# only delete unresolved column when it's resolved
g.remove_edge(unresolved_col, tgt_col)

# when unresolved column got resolved, it will be orphan node, and we can remove it
for node in [n for n, deg in g.degree if deg == 0]:
Expand Down
1 change: 1 addition & 0 deletions sqllineage/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def __init__(self, name: str, **kwargs):
"source_columns", ((self.raw_name, None),)
)
]
self.from_alias = kwargs.pop("from_alias", False)

def __str__(self):
return (
Expand Down
78 changes: 48 additions & 30 deletions sqllineage/core/parser/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Tuple, Union
from typing import Dict, List, Tuple, Union

from sqllineage.config import SQLLineageConfig
from sqllineage.core.holders import SubQueryLineageHolder
from sqllineage.core.models import Column, Path, SubQuery, Table
from sqllineage.exceptions import SQLLineageException
Expand All @@ -23,41 +24,58 @@ def end_of_query_cleanup(self, holder: SubQueryLineageHolder) -> None:
if holder.write:
if len(holder.write) > 1:
raise SQLLineageException
tgt_tbl = list(holder.write)[0]
lateral_aliases = set()
for idx, tgt_col in enumerate(col_grp):
tgt_col.parent = tgt_tbl
for lateral_alias_ref in col_grp[idx + 1 :]: # noqa: E203
if any(
src_col[0] == tgt_col.raw_name
for src_col in lateral_alias_ref.source_columns
):
lateral_aliases.add(tgt_col.raw_name)
break
for src_col in tgt_col.to_source_columns(
holder.get_alias_mapping_from_table_group(tbl_grp)
tgt_tbl = next(iter(holder.write))
lateral_column_aliases: Dict[str, List[Column]] = {}
for idx, tgt_col_from_query in enumerate(col_grp):
tgt_col_from_query.parent = tgt_tbl
tgt_col_resolved = tgt_col_from_query
src_cols_resolved = []
for src_col in tgt_col_from_query.to_source_columns(
holder.get_alias_mapping_from_table_group(tbl_grp),
):
if len(write_columns := holder.write_columns) == len(col_grp):
# example query: create view test (col3) select col1 as col2 from tab
# without write_columns = [col3] information, by default src_col = col1 and tgt_col = col2
# when write_columns exist and length matches, we want tgt_col = col3 instead of col2
# for invalid query: create view test (col3, col4) select col1 as col2 from tab,
# when the length doesn't match, we fall back to default behavior
tgt_col = write_columns[idx]
is_lateral_alias_ref = False
for wc in holder.write_columns:
if wc.raw_name == "*":
continue
if (
src_col.raw_name == wc.raw_name
and src_col.raw_name in lateral_aliases
tgt_col_resolved = write_columns[idx]
# lateral column alias handling
lca_flag = False
if SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE:
if metadata_provider := getattr(
self, "metadata_provider", None
):
is_lateral_alias_ref = True
for lateral_alias_col in holder.get_source_columns(wc):
holder.add_column_lineage(
lateral_alias_col, tgt_col
from_dataset = False
for parent_candidate in src_col.parent_candidates:
if isinstance(
parent_candidate, Table
) and src_col in metadata_provider.get_table_columns(
parent_candidate
):
from_dataset = True
elif isinstance(
parent_candidate, SubQuery
) and src_col in holder.get_table_columns(
parent_candidate
):
from_dataset = True
if not from_dataset and (
lca_cols_resolved := lateral_column_aliases.get(
src_col.raw_name, []
)
break
if is_lateral_alias_ref:
continue
holder.add_column_lineage(src_col, tgt_col)
):
src_cols_resolved.extend(lca_cols_resolved)
lca_flag = True
if not lca_flag:
src_cols_resolved.append(src_col)

for src_col_resolved in src_cols_resolved:
holder.add_column_lineage(src_col_resolved, tgt_col_resolved)
if (
SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE
and tgt_col_from_query.from_alias
):
lateral_column_aliases[tgt_col_from_query.raw_name] = (
src_cols_resolved
)
11 changes: 8 additions & 3 deletions sqllineage/core/parser/sqlfluff/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from operator import add
from typing import List, Optional, Type, Union

import networkx as nx
from sqlfluff.core.parser import BaseSegment

from sqllineage.core.holders import SubQueryLineageHolder
Expand All @@ -16,6 +17,7 @@
list_join_clause,
list_subqueries,
)
from sqllineage.utils.constant import NodeTag
from sqllineage.utils.entities import AnalyzerContext, SubQueryTuple
from sqllineage.utils.helpers import escape_identifier_name

Expand Down Expand Up @@ -220,9 +222,12 @@ def extract_subquery(
if sq.query.get_child("with_compound_statement")
else SelectExtractor
)
holder |= extractor_cls(self.dialect, self.metadata_provider).extract(
sq.query, AnalyzerContext(cte=holder.cte, write={sq})
)
subquery_holder = extractor_cls(
self.dialect, self.metadata_provider
).extract(sq.query, AnalyzerContext(cte=holder.cte, write={sq}))
# remove WRITE tag from subquery so that the combined holder won't have multiple WRITE dataset
nx.set_node_attributes(subquery_holder.graph, {sq, False}, NodeTag.WRITE)
holder |= subquery_holder

@staticmethod
def _init_holder(context: AnalyzerContext) -> SubQueryLineageHolder:
Expand Down
16 changes: 12 additions & 4 deletions sqllineage/core/parser/sqlfluff/extractors/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ def extract(
# so that each handler don't have to worry about what's inside subquery
subqueries.append(sq)

if is_set_expression(segment):
for _, sub_segment in enumerate(
segment.get_children("select_statement", "bracketed")
):
for seg in list_child_segments(sub_segment):
for sq in self.list_subquery(seg):
subqueries.append(sq)

self.extract_subquery(subqueries, holder)

for segment in segments:
self._handle_swap_partition(segment, holder)
self._handle_select_into(segment, holder)
self.tables.extend(
Expand All @@ -64,17 +75,14 @@ def extract(
(len(self.columns), len(self.tables))
)
for seg in list_child_segments(sub_segment):
for sq in self.list_subquery(seg):
subqueries.append(sq)
self.tables.extend(
self._list_table_from_from_clause_or_join_clause(
seg, holder
)
)
self._handle_column(seg)
self.end_of_query_cleanup(holder)

self.extract_subquery(subqueries, holder)
self.end_of_query_cleanup(holder)

holder.expand_wildcard(self.metadata_provider)

Expand Down
10 changes: 2 additions & 8 deletions sqllineage/core/parser/sqlfluff/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ def of(column: BaseSegment, **kwargs) -> Column:
if column.type == "select_clause_element":
source_columns, alias = SqlFluffColumn._get_column_and_alias(column)
if alias:
return Column(
alias,
source_columns=source_columns,
)
return Column(alias, source_columns=source_columns, from_alias=True)
if source_columns:
column_name = None
for sub_segment in list_child_segments(column):
Expand Down Expand Up @@ -145,10 +142,7 @@ def of(column: BaseSegment, **kwargs) -> Column:

# Wildcard, Case, Function without alias (thus not recognized as an Identifier)
source_columns = SqlFluffColumn._extract_source_columns(column)
return Column(
column.raw,
source_columns=source_columns,
)
return Column(column.raw, source_columns=source_columns)

@staticmethod
def _extract_source_columns(segment: BaseSegment) -> List[ColumnQualifierTuple]:
Expand Down
Loading

0 comments on commit d0ca3dc

Please sign in to comment.