Skip to content

Commit

Permalink
updated _get_max_run_id to _get_column_value
Browse files Browse the repository at this point in the history
  • Loading branch information
m-kovalsky committed Dec 23, 2024
1 parent 3c9f579 commit b468e00
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 17 deletions.
15 changes: 6 additions & 9 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,12 +1188,12 @@ def generate_guid():
return str(uuid.uuid4())


def _get_max_run_id(lakehouse: str, table_name: str) -> int:
def _get_column_value(lakehouse: str, table_name: str, column_name: str = 'RunId', function: str = 'MAX') -> int:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
query = f"SELECT MAX(RunId) FROM {lakehouse}.{table_name}"
query = f"SELECT {function}({column_name}) FROM {lakehouse}.{table_name}"
dfSpark = spark.sql(query)
max_run_id = dfSpark.collect()[0][0] or 0

Expand All @@ -1209,20 +1209,17 @@ def _get_partition_map(
dataset: str, workspace: Optional[str | UUID] = None
) -> pd.DataFrame:

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(dataset_name, dataset_id) = resolve_dataset_name_and_id(dataset, workspace_id)

partitions = fabric.evaluate_dax(
dataset=dataset_id,
workspace=workspace_id,
dataset=dataset,
workspace=workspace,
dax_string="""
select [ID] AS [PartitionID], [TableID], [Name] AS [PartitionName] from $system.tmschema_partitions
""",
)

tables = fabric.evaluate_dax(
dataset=dataset_id,
workspace=workspace_id,
dataset=dataset,
workspace=workspace,
dax_string="""
select [ID] AS [TableID], [Name] AS [TableName] from $system.tmschema_tables
""",
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_model_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
resolve_workspace_capacity,
resolve_dataset_name_and_id,
get_language_codes,
_get_max_run_id,
_get_column_value,
resolve_workspace_name_and_id,
)
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
Expand Down Expand Up @@ -383,7 +383,7 @@ def translate_using_spark(rule_file):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_max_run_id(
max_run_id = _get_column_value(
lakehouse=lakehouse, table_name=delta_table_name
)
runId = max_run_id + 1
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_model_bpa_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
save_as_delta_table,
resolve_workspace_capacity,
retry,
_get_max_run_id,
_get_column_value,
)
from sempy_labs.lakehouse import (
get_lakehouse_tables,
Expand Down Expand Up @@ -76,7 +76,7 @@ def run_model_bpa_bulk(
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=output_table)
max_run_id = _get_column_value(lakehouse=lakehouse, table_name=output_table)
runId = max_run_id + 1

if isinstance(workspace, str):
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/_vertipaq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
resolve_lakehouse_name,
save_as_delta_table,
resolve_workspace_capacity,
_get_max_run_id,
_get_column_value,
resolve_workspace_name_and_id,
resolve_dataset_name_and_id,
)
Expand Down Expand Up @@ -519,7 +519,7 @@ def _style_columns_based_on_types(dataframe: pd.DataFrame, column_type_mapping):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=lakeTName)
max_run_id = _get_column_value(lakehouse=lakehouse, table_name=lakeTName)
runId = max_run_id + 1

dfMap = {
Expand Down
4 changes: 2 additions & 2 deletions src/sempy_labs/report/_report_bpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
resolve_report_id,
resolve_lakehouse_name,
resolve_workspace_capacity,
_get_max_run_id,
_get_column_value,
resolve_workspace_name_and_id,
)
from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached
Expand Down Expand Up @@ -217,7 +217,7 @@ def execute_rule(row):
if len(lakeT_filt) == 0:
runId = 1
else:
max_run_id = _get_max_run_id(
max_run_id = _get_column_value(
lakehouse=lakehouse, table_name=delta_table_name
)
runId = max_run_id + 1
Expand Down

0 comments on commit b468e00

Please sign in to comment.