Skip to content

Commit

Permalink
M kovalsky/optimizecachewarming (#366)
Browse files Browse the repository at this point in the history
* optimize and centralize cache warming

* optimize and centralize cache warming

* fix resolve_workspace_name_and_id
  • Loading branch information
m-kovalsky authored Dec 18, 2024
1 parent 7907616 commit 06d1423
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 57 deletions.
26 changes: 8 additions & 18 deletions src/sempy_labs/_dax.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sempy_labs._model_dependencies import get_model_calc_dependencies
from typing import Optional, List
from sempy._utils._log import log
from tqdm.auto import tqdm
from uuid import UUID
from sempy_labs.directlake._warm_cache import _put_columns_into_memory


@log
Expand Down Expand Up @@ -189,22 +189,12 @@ def get_dax_query_dependencies(
not_in_memory = dfC_filtered[dfC_filtered["Is Resident"] == False]

if len(not_in_memory) > 0:
tbls = not_in_memory["Table Name"].unique()

# Run basic query to get columns into memory; completed one table at a time (so as not to overload the capacity)
for table_name in (bar := tqdm(tbls)):
bar.set_description(f"Warming the '{table_name}' table...")
css = ", ".join(
not_in_memory[not_in_memory["Table Name"] == table_name][
"Full Object"
]
.astype(str)
.tolist()
)
dax = f"""EVALUATE TOPN(1,SUMMARIZECOLUMNS({css}))"""
fabric.evaluate_dax(
dataset=dataset_id, dax_string=dax, workspace=workspace_id
)
_put_columns_into_memory(
dataset=dataset,
workspace=workspace,
col_df=dfC_filtered,
return_dataframe=False,
)

# Get column stats again
dfC = fabric.list_columns(
Expand All @@ -214,7 +204,7 @@ def get_dax_query_dependencies(
dfC["Table Name"], dfC["Column Name"]
)

dfC_filtered = dfC[dfC["Full Object"].isin(df["Full Object"].values)][
dfC_filtered = dfC[dfC["Full Object"].isin(final_df["Full Object"].values)][
[
"Table Name",
"Column Name",
Expand Down
5 changes: 4 additions & 1 deletion src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,12 @@ def resolve_workspace_name_and_id(
if workspace is None:
workspace_id = fabric.get_workspace_id()
workspace_name = fabric.resolve_workspace_name(workspace_id)
elif _is_valid_uuid(workspace):
workspace_id = workspace
workspace_name = fabric.resolve_workspace_name(workspace_id)
else:
workspace_id = fabric.resolve_workspace_id(workspace)
workspace_name = workspace
workspace_id = fabric.resolve_workspace_id(workspace_name)

return str(workspace_name), str(workspace_id)

Expand Down
89 changes: 51 additions & 38 deletions src/sempy_labs/directlake/_warm_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,7 @@ def warm_direct_lake_cache_perspective(
df["Table Name"] = df["Table Name"].str[1:-1]
df["Column Name"] = df["Column Name"].str[0:-1]

tbls = list(set(value.split("[")[0] for value in merged_list_unique))

for tableName in (bar := tqdm(tbls)):
filtered_list = [
value for value in merged_list_unique if value.startswith(f"{tableName}[")
]
bar.set_description(f"Warming the '{tableName}' table...")
css = ",".join(map(str, filtered_list))
dax = """EVALUATE TOPN(1,SUMMARIZECOLUMNS(""" + css + "))" ""
fabric.evaluate_dax(dataset=dataset_id, dax_string=dax, workspace=workspace_id)

print(f"{icons.green_dot} The following columns have been put into memory:")

new_column_order = ["Table Name", "Column Name", "DAX Object Name"]
df = df.reindex(columns=new_column_order)
df = df[["Table Name", "Column Name"]].sort_values(
by=["Table Name", "Column Name"], ascending=True
)

return df
return _put_columns_into_memory(dataset=dataset, workspace=workspace, col_df=df)


@log
Expand Down Expand Up @@ -181,9 +162,6 @@ def warm_direct_lake_cache_isresident(

# Identify columns which are currently in memory (Is Resident = True)
dfC = fabric.list_columns(dataset=dataset_id, workspace=workspace_id, extended=True)
dfC["DAX Object Name"] = format_dax_object_name(
dfC["Table Name"], dfC["Column Name"]
)
dfC_filtered = dfC[dfC["Is Resident"] == True]

if len(dfC_filtered) == 0:
Expand All @@ -197,22 +175,57 @@ def warm_direct_lake_cache_isresident(
)
time.sleep(2)

# Run basic query to get columns into memory; completed one table at a time (so as not to overload the capacity)
tbls = dfC_filtered["Table Name"].unique()
return _put_columns_into_memory(
dataset=dataset, workspace=workspace, col_df=dfC_filtered
)


def _put_columns_into_memory(dataset, workspace, col_df, return_dataframe: bool = True):

row_limit = 1000000

dfT = fabric.list_tables(dataset=dataset, workspace=workspace, extended=True)
col_df = col_df.copy()

col_df["DAX Object"] = format_dax_object_name(
col_df["Table Name"], col_df["Column Name"]
)
tbls = col_df["Table Name"].unique()

for table_name in (bar := tqdm(tbls)):
bar.set_description(f"Warming the '{table_name}' table...")
css = ", ".join(
dfC_filtered[dfC_filtered["Table Name"] == table_name]["DAX Object Name"]
.astype(str)
.tolist()
dfT_filt = dfT[dfT["Name"] == table_name]
col_df_filt = col_df[col_df["Table Name"] == table_name]
if not dfT_filt.empty:
row_count = dfT_filt["Row Count"].iloc[0]
bar.set_description(f"Warming the '{table_name}' table...")
if row_count < row_limit:
columns = col_df_filt["DAX Object"].tolist()
css = ", ".join(columns)
dax = f"EVALUATE TOPN(1, SELECTCOLUMNS('{table_name}', {css}))"
fabric.evaluate_dax(
dataset=dataset, dax_string=dax, workspace=workspace
)
else:
for _, r in col_df_filt.iterrows():
dax_object = r["DAX Object"]
dax = f"""EVALUATE TOPN(1, SELECTCOLUMNS('{table_name}', {dax_object}))"""
fabric.evaluate_dax(
dataset=dataset, dax_string=dax, workspace=workspace
)

if return_dataframe:
print(
f"{icons.green_dot} The following columns have been put into memory. Temperature indicates the current column temperature."
)
dax = f"""EVALUATE TOPN(1,SUMMARIZECOLUMNS({css}))"""
fabric.evaluate_dax(dataset=dataset_id, dax_string=dax, workspace=workspace_id)

print(
f"{icons.green_dot} The following columns have been put into memory. Temperature indicates the column temperature prior to the semantic model refresh."
)
dfC = fabric.list_columns(dataset=dataset, workspace=workspace, extended=True)
dfC["DAX Object"] = format_dax_object_name(
dfC["Table Name"], dfC["Column Name"]
)
dfC_filt = dfC[dfC["DAX Object"].isin(col_df["DAX Object"].values)]

return dfC_filtered[
["Table Name", "Column Name", "Is Resident", "Temperature"]
].sort_values(by=["Table Name", "Column Name"], ascending=True)
return (
dfC_filt[["Table Name", "Column Name", "Is Resident", "Temperature"]]
.sort_values(by=["Table Name", "Column Name"], ascending=True)
.reset_index(drop=True)
)

0 comments on commit 06d1423

Please sign in to comment.