diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 64b33f45..1066bfee 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -1188,16 +1188,24 @@ def generate_guid(): return str(uuid.uuid4()) -def _get_max_run_id(lakehouse: str, table_name: str) -> int: +def _get_column_aggregate( + lakehouse: str, + table_name: str, + column_name: str = "RunId", + function: str = "max", + default_value: int = 0, +) -> int: from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() - query = f"SELECT MAX(RunId) FROM {lakehouse}.{table_name}" + function = function.upper() + query = f"SELECT {function}({column_name}) FROM {lakehouse}.{table_name}" + if "COUNT" in function and "DISTINCT" in function: + query = f"SELECT COUNT(DISTINCT({column_name})) FROM {lakehouse}.{table_name}" dfSpark = spark.sql(query) - max_run_id = dfSpark.collect()[0][0] or 0 - return max_run_id + return dfSpark.collect()[0][0] or default_value def _make_list_unique(my_list): @@ -1209,20 +1217,17 @@ def _get_partition_map( dataset: str, workspace: Optional[str | UUID] = None ) -> pd.DataFrame: - (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) - (dataset_name, dataset_id) = resolve_dataset_name_and_id(dataset, workspace_id) - partitions = fabric.evaluate_dax( - dataset=dataset_id, - workspace=workspace_id, + dataset=dataset, + workspace=workspace, dax_string=""" select [ID] AS [PartitionID], [TableID], [Name] AS [PartitionName] from $system.tmschema_partitions """, ) tables = fabric.evaluate_dax( - dataset=dataset_id, - workspace=workspace_id, + dataset=dataset, + workspace=workspace, dax_string=""" select [ID] AS [TableID], [Name] AS [TableName] from $system.tmschema_tables """, diff --git a/src/sempy_labs/_model_bpa.py b/src/sempy_labs/_model_bpa.py index d6cc808c..4b7f6094 100644 --- a/src/sempy_labs/_model_bpa.py +++ b/src/sempy_labs/_model_bpa.py @@ -12,7 +12,7 @@ resolve_workspace_capacity, resolve_dataset_name_and_id, get_language_codes, - _get_max_run_id, + _get_column_aggregate, resolve_workspace_name_and_id, ) from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached @@ -383,7 +383,7 @@ def translate_using_spark(rule_file): if len(lakeT_filt) == 0: runId = 1 else: - max_run_id = _get_max_run_id( + max_run_id = _get_column_aggregate( lakehouse=lakehouse, table_name=delta_table_name ) runId = max_run_id + 1 diff --git a/src/sempy_labs/_model_bpa_bulk.py b/src/sempy_labs/_model_bpa_bulk.py index 7e0262d5..5d2e8a4c 100644 --- a/src/sempy_labs/_model_bpa_bulk.py +++ b/src/sempy_labs/_model_bpa_bulk.py @@ -6,7 +6,7 @@ save_as_delta_table, resolve_workspace_capacity, retry, - _get_max_run_id, + _get_column_aggregate, ) from sempy_labs.lakehouse import ( get_lakehouse_tables, @@ -76,7 +76,7 @@ def run_model_bpa_bulk( if len(lakeT_filt) == 0: runId = 1 else: - max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=output_table) + max_run_id = _get_column_aggregate(lakehouse=lakehouse, table_name=output_table) runId = max_run_id + 1 if isinstance(workspace, str): diff --git a/src/sempy_labs/_vertipaq.py b/src/sempy_labs/_vertipaq.py index cb7078d6..4802fe4d 100644 --- a/src/sempy_labs/_vertipaq.py +++ b/src/sempy_labs/_vertipaq.py @@ -12,7 +12,7 @@ resolve_lakehouse_name, save_as_delta_table, resolve_workspace_capacity, - _get_max_run_id, + _get_column_aggregate, resolve_workspace_name_and_id, resolve_dataset_name_and_id, ) @@ -519,7 +519,9 @@ def _style_columns_based_on_types(dataframe: pd.DataFrame, column_type_mapping): if len(lakeT_filt) == 0: runId = 1 else: - max_run_id = _get_max_run_id(lakehouse=lakehouse, table_name=lakeTName) + max_run_id = _get_column_aggregate( + lakehouse=lakehouse, table_name=lakeTName + ) runId = max_run_id + 1 dfMap = { diff --git a/src/sempy_labs/report/_report_bpa.py b/src/sempy_labs/report/_report_bpa.py index a9ede3e8..6219dd7e 100644 --- a/src/sempy_labs/report/_report_bpa.py +++ b/src/sempy_labs/report/_report_bpa.py @@ -10,7 +10,7 @@ resolve_report_id, resolve_lakehouse_name, resolve_workspace_capacity, - _get_max_run_id, + _get_column_aggregate, resolve_workspace_name_and_id, ) from sempy_labs.lakehouse import get_lakehouse_tables, lakehouse_attached @@ -217,7 +217,7 @@ def execute_rule(row): if len(lakeT_filt) == 0: runId = 1 else: - max_run_id = _get_max_run_id( + max_run_id = _get_column_aggregate( lakehouse=lakehouse, table_name=delta_table_name ) runId = max_run_id + 1