Skip to content

Commit

Permalink
Merge branch 'm-kovalsky/upstreamdataflows'
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kovalsky committed Nov 11, 2024
2 parents fea8006 + 8a11ebc commit 91614a6
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/sempy_labs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
list_dataflow_storage_accounts,
assign_workspace_to_dataflow_storage,
list_dataflows,
list_upstream_dataflows,
)
from sempy_labs._connections import (
list_connections,
Expand Down Expand Up @@ -423,4 +424,5 @@
"delete_mirrored_database",
"update_mirrored_database_definition",
"get_tables_mirroring_status",
"list_upstream_dataflows",
]
115 changes: 114 additions & 1 deletion src/sempy_labs/_dataflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import pandas as pd
from sempy_labs._helper_functions import (
resolve_workspace_name_and_id,
_is_valid_uuid,
)
from typing import Optional
from typing import Optional, Tuple
import sempy_labs._icons as icons
from sempy.fabric.exceptions import FabricHTTPException
from uuid import UUID


def list_dataflows(workspace: Optional[str] = None):
Expand Down Expand Up @@ -132,3 +134,114 @@ def list_dataflow_storage_accounts() -> pd.DataFrame:
df["Enabled"] = df["Enabled"].astype(bool)

return df


def list_upstream_dataflows(
dataflow: str | UUID, workspace: Optional[str] = None
) -> pd.DataFrame:
"""
Shows a list of upstream dataflows for the specified dataflow.
This is a wrapper function for the following API: `Dataflows - Get Upstream Dataflows In Group <https://learn.microsoft.com/rest/api/power-bi/dataflows/get-upstream-dataflows-in-group>`_.
Parameters
----------
dataflow : str | UUID
Name or UUID of the dataflow.
workspace : str, default=None
The Fabric workspace name.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
Returns
-------
pandas.DataFrame
A pandas dataframe showing a list of upstream dataflows for the specified dataflow.
"""

workspace_name = fabric.resolve_workspace_name(workspace)
workspace_id = fabric.resolve_workspace_id(workspace)
(dataflow_name, dataflow_id) = _resolve_dataflow_name_and_id(
dataflow=dataflow, workspace=workspace
)
client = fabric.PowerBIRestClient()

df = pd.DataFrame(
columns=[
"Dataflow Name",
"Dataflow Id",
"Workspace Name",
"Workspace Id",
"Upstream Dataflow Name",
"Upstream Dataflow Id",
"Upstream Workspace Name",
"Upstream Workspace Id",
]
)

def collect_upstreams(
client, dataflow_id, dataflow_name, workspace_id, workspace_name
):
response = client.get(
f"/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/upstreamDataflows"
)
if response.status_code != 200:
raise FabricHTTPException(response)

values = response.json().get("value", [])
for v in values:
tgt_dataflow_id = v.get("targetDataflowId")
tgt_workspace_id = v.get("groupId")
tgt_workspace_name = fabric.resolve_workspace_name(tgt_workspace_id)
(tgt_dataflow_name, _) = _resolve_dataflow_name_and_id(
dataflow=tgt_dataflow_id, workspace=tgt_workspace_name
)

df.loc[len(df)] = {
"Dataflow Name": dataflow_name,
"Dataflow Id": dataflow_id,
"Workspace Name": workspace_name,
"Workspace Id": workspace_id,
"Upstream Dataflow Name": tgt_dataflow_name,
"Upstream Dataflow Id": tgt_dataflow_id,
"Upstream Workspace Name": tgt_workspace_name,
"Upstream Workspace Id": tgt_workspace_id,
}

collect_upstreams(
client,
tgt_dataflow_id,
tgt_dataflow_name,
tgt_workspace_id,
tgt_workspace_name,
)

collect_upstreams(client, dataflow_id, dataflow_name, workspace_id, workspace_name)

return df


def _resolve_dataflow_name_and_id(
dataflow: str | UUID, workspace: Optional[str] = None
) -> Tuple[str, UUID]:

if workspace is None:
workspace = fabric.resolve_workspace_name(workspace)

dfD = list_dataflows(workspace=workspace)

if _is_valid_uuid(dataflow):
dfD_filt = dfD[dfD["Dataflow Id"] == dataflow]
else:
dfD_filt = dfD[dfD["Dataflow Name"] == dataflow]

if len(dfD_filt) == 0:
raise ValueError(
f"{icons.red_dot} The '{dataflow}' dataflow does not exist within the '{workspace}' workspace."
)

dataflow_id = dfD_filt["Dataflow Id"].iloc[0]
dataflow_name = dfD_filt["Dataflow Name"].iloc[0]

return dataflow_name, dataflow_id
24 changes: 24 additions & 0 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,3 +1317,27 @@ def _convert_data_type(input_data_type: str) -> str:
return "Decimal"
else:
return data_type_mapping.get(input_data_type)


def _is_valid_uuid(
guid: str,
):
"""
Validates if a string is a valid GUID in version 4
Parameters
----------
guid : str
GUID to be validated.
Returns
-------
bool
Boolean that indicates if the string is a GUID or not.
"""

try:
UUID(str(guid), version=4)
return True
except ValueError:
return False

0 comments on commit 91614a6

Please sign in to comment.