Skip to content

Commit

Permalink
feat: Allow multiple sources for column lineage (#561)
Browse files Browse the repository at this point in the history
  • Loading branch information
kkozhakin authored Jan 20, 2024
1 parent 8a02894 commit 4c6c65e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 34 deletions.
13 changes: 5 additions & 8 deletions sqllineage/core/holders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from sqllineage.core.metadata_provider import MetaDataProvider
from sqllineage.core.models import Column, Path, Schema, SubQuery, Table
from sqllineage.exceptions import InvalidSyntaxException
from sqllineage.utils.constant import EdgeTag, EdgeType, NodeTag

DATASET_CLASSES = (Path, Table)
Expand Down Expand Up @@ -439,14 +438,12 @@ def _build_digraph(
if unresolved_col.raw_name == src_col.raw_name:
src_cols.append(src_col)

if len(src_cols) > 1:
raise InvalidSyntaxException(
f"{unresolved_col.raw_name} is not allowed from more than one table or subquery"
)
# Multiple sources is a correct case for JOIN with USING
# It incorrect for JOIN with ON, but sql without specifying an alias in this case will be invalid
for src_col in src_cols:
g.add_edge(src_col, tgt_col, type=EdgeType.LINEAGE)
g.remove_edge(unresolved_col, tgt_col)

if len(src_cols) == 1:
g.add_edge(src_cols[0], tgt_col, type=EdgeType.LINEAGE)
g.remove_edge(unresolved_col, tgt_col)
# when unresolved column got resolved, it will be orphan node, and we can remove it
for node in [n for n, deg in g.degree if deg == 0]:
if isinstance(node, Column) and len(node.parent_candidates) > 1:
Expand Down
20 changes: 14 additions & 6 deletions tests/sql/column/test_column_select_from_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,22 @@ def test_select_column_from_table_join():


def test_select_column_without_table_qualifier_from_table_join():
sql = """INSERT INTO tab1
SELECT col1
FROM tab2 a
INNER JOIN tab3 b
ON a.id = b.id"""
sql = """INSERT INTO tab3
SELECT f1
FROM ( SELECT f1 FROM tab1)
LEFT JOIN ( SELECT f1 FROM tab2) USING (f1)"""
assert_column_lineage_equal(
sql,
[(ColumnQualifierTuple("col1", None), ColumnQualifierTuple("col1", "tab1"))],
[
(
ColumnQualifierTuple("f1", "tab1"),
ColumnQualifierTuple("f1", "tab3"),
),
(
ColumnQualifierTuple("f1", "tab2"),
ColumnQualifierTuple("f1", "tab3"),
),
],
)


Expand Down
20 changes: 0 additions & 20 deletions tests/sql/column/test_metadata_unqualified_column.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import pytest

from sqllineage.core.metadata_provider import MetaDataProvider
from sqllineage.exceptions import InvalidSyntaxException
from sqllineage.runner import LineageRunner
from sqllineage.utils.entities import ColumnQualifierTuple
from ...helpers import assert_column_lineage_equal, generate_metadata_providers

Expand Down Expand Up @@ -274,21 +272,3 @@ def test_select_column_from_tempview_view_subquery(provider: MetaDataProvider):
],
metadata_provider=provider,
)


@pytest.mark.parametrize("provider", providers)
def test_sqlparse_exception(provider: MetaDataProvider):
sql = """insert into db.tbl
select id
from db1.table1 t1
join db2.table2 t2 on t1.id = t2.id
"""

with pytest.raises(
InvalidSyntaxException,
match="id is not allowed from more than one table or subquery",
):
lr = LineageRunner(sql, metadata_provider=provider)
col_lineage = lr.get_column_lineage()
for e in col_lineage:
print(e)

0 comments on commit 4c6c65e

Please sign in to comment.