From 0aa86be59831a739145fd4578f1a6a71a199c2cd Mon Sep 17 00:00:00 2001 From: Danilo Lessa Bernardineli Date: Thu, 14 Dec 2023 20:38:50 -0300 Subject: [PATCH] Add switch for toggling deepcopy off (#316) * fix tests + rm simulations/ folder * add types.py * single run / multi mc is ok * fix for single run / single param * add support for single proc runs * add switch for using deepcopy + fix bug on additional_objs * bug fix * bug fix --------- Co-authored-by: Emanuel Lima --- cadCAD/engine/__init__.py | 6 +- cadCAD/engine/execution.py | 17 +-- cadCAD/engine/simulation.py | 50 +++++---- cadCAD/types.py | 4 +- testing/test_additional_objs.py | 187 ++++++++++++++++++++++++++++++++ testing/test_arg_count.py | 51 +++++++++ 6 files changed, 286 insertions(+), 29 deletions(-) create mode 100644 testing/test_additional_objs.py create mode 100644 testing/test_arg_count.py diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index c154e72f..59e49d6b 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,5 +1,5 @@ from time import time -from typing import Callable, Dict, List, Any, Tuple +from typing import Callable, Dict, List, Any, Tuple, Union from cadCAD.utils import flatten from cadCAD.utils.execution import print_exec_info @@ -39,6 +39,7 @@ def auto_mode_switcher(config_amt: int): class ExecutionContext: def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None: self.name = context + self.additional_objs = additional_objs if context == 'local_proc': self.method = local_simulations elif context == 'single_proc': @@ -74,6 +75,7 @@ def __init__(self, self.SimExecutor = SimExecutor self.exec_method = exec_context.method self.exec_context = exec_context.name + self.additional_objs = exec_context.additional_objs self.configs = configs self.empty_return = empty_return @@ -174,7 +176,7 @@ def get_final_results(simulations: List[StateHistory], print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, - ExpIDs, SubsetIDs, SubsetWindows, original_N + ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) final_result = get_final_results( diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index fdf0452c..febafffe 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -22,7 +22,8 @@ def single_proc_exec( ExpIDs: List[int], SubsetIDs: List[SubsetID], SubsetWindows: List[SubsetWindow], - configured_n: List[N_Runs] + configured_n: List[N_Runs], + additional_objs=None ): # HACK for making it run with N_Runs=1 @@ -38,7 +39,7 @@ def single_proc_exec( map(lambda x: x.pop(), raw_params) ) result = simulation_exec( - var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs ) return flatten(result) @@ -58,7 +59,8 @@ def parallelize_simulations( ExpIDs: List[int], SubsetIDs: List[SubsetID], SubsetWindows: List[SubsetWindow], - configured_n: List[N_Runs] + configured_n: List[N_Runs], + additional_objs=None ): print(f'Execution Mode: parallelized') @@ -96,7 +98,7 @@ def process_executor(params): if len_configs_structs > 1: pp = PPool(processes=len_configs_structs) results = pp.map( - lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params + lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n, additional_objs), params ) pp.close() pp.join() @@ -123,18 +125,19 @@ def local_simulations( ExpIDs: List[int], SubsetIDs: List[SubsetID], SubsetWindows: List[SubsetWindow], - configured_n: List[N_Runs] + configured_n: List[N_Runs], + additional_objs=None ): config_amt = len(configs_structs) if config_amt == 1: # and configured_n != 1 return single_proc_exec( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n + Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs ) elif config_amt > 1: # and configured_n != 1 return parallelize_simulations( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n + Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs ) # elif config_amt > 1 and configured_n == 1: diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index cb492706..1e7829e5 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -1,10 +1,12 @@ from typing import Any, Callable, Dict, List, Tuple from copy import deepcopy +from types import MappingProxyType from functools import reduce -from funcy import curry +from funcy import curry # type: ignore from cadCAD.utils import flatten from cadCAD.engine.utils import engine_exception +from cadCAD.types import * id_exception: Callable = curry(engine_exception)(KeyError)(KeyError)(None) @@ -102,20 +104,30 @@ def env_composition(target_field, state_dict, target_value): # mech_step def partial_state_update( self, - sweep_dict: Dict[str, List[Any]], - sub_step: int, - sL, - sH, - state_funcs: List[Callable], - policy_funcs: List[Callable], - env_processes: Dict[str, Callable], + sweep_dict: Parameters, + sub_step: Substep, + sL: list[State], + sH: StateHistory, + state_funcs: List[StateUpdateFunction], + policy_funcs: List[PolicyFunction], + env_processes: EnvProcesses, time_step: int, run: int, additional_objs ) -> List[Dict[str, Any]]: - # last_in_obj: Dict[str, Any] = MappingProxyType(sL[-1]) - last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) + if type(additional_objs) == dict: + if additional_objs.get('deepcopy_off', False) == True: + last_in_obj = MappingProxyType(sL[-1]) + if len(additional_objs) == 1: + additional_objs = None + # XXX: drop the additional objects if only used for deepcopy + # toggling. + else: + last_in_obj = deepcopy(sL[-1]) + else: + last_in_obj = deepcopy(sL[-1]) + _input: Dict[str, Any] = self.policy_update_exception( self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, additional_objs) ) @@ -206,18 +218,18 @@ def run_pipeline( def simulation( self, - sweep_dict: Dict[str, List[Any]], - states_list: List[Dict[str, Any]], + sweep_dict: SweepableParameters, + states_list: StateHistory, configs, - env_processes: Dict[str, Callable], - time_seq: range, - simulation_id: int, + env_processes: EnvProcesses, + time_seq: TimeSeq, + simulation_id: SimulationID, run: int, - subset_id, - subset_window, - configured_N, + subset_id: SubsetID, + subset_window: SubsetWindow, + configured_N: int, # remote_ind - additional_objs=None + additional_objs: Union[None, Dict]=None ): run += 1 subset_window.appendleft(subset_id) diff --git a/cadCAD/types.py b/cadCAD/types.py index 5e8eb274..4f5de596 100644 --- a/cadCAD/types.py +++ b/cadCAD/types.py @@ -25,7 +25,9 @@ class ConfigurationDict(TypedDict): M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep -EnvProcesses = object +TargetValue = object +EnvProcess: Callable[[State, SweepableParameters, TargetValue], TargetValue] +EnvProcesses = dict[str, Callable] TimeSeq = Iterator SimulationID = int Run = int diff --git a/testing/test_additional_objs.py b/testing/test_additional_objs.py new file mode 100644 index 00000000..89cd2995 --- /dev/null +++ b/testing/test_additional_objs.py @@ -0,0 +1,187 @@ +from typing import Dict, List +from cadCAD.engine import Executor, ExecutionContext, ExecutionMode +from cadCAD.configuration import Experiment +from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list +from cadCAD.types import * +import pandas as pd # type: ignore +import types +import inspect +import pytest + +def describe_or_return(v: object) -> object: + """ + Thanks @LinuxIsCool! + """ + if isinstance(v, types.FunctionType): + return f'function: {v.__name__}' + elif isinstance(v, types.LambdaType) and v.__name__ == '': + return f'lambda: {inspect.signature(v)}' + else: + return v + + +def select_M_dict(M_dict: Dict[str, object], keys: set) -> Dict[str, object]: + """ + Thanks @LinuxIsCool! + """ + return {k: describe_or_return(v) for k, v in M_dict.items() if k in keys} + + +def select_config_M_dict(configs: list, i: int, keys: set) -> Dict[str, object]: + return select_M_dict(configs[i].sim_config['M'], keys) + + +def drop_substeps(_df): + first_ind = (_df.substep == 0) & (_df.timestep == 0) + last_ind = _df.substep == max(_df.substep) + inds_to_drop = first_ind | last_ind + return _df.copy().loc[inds_to_drop].drop(columns=['substep']) + + +def assign_params(_df: pd.DataFrame, configs) -> pd.DataFrame: + """ + Based on `cadCAD-tools` package codebase, by @danlessa + """ + M_dict = configs[0].sim_config['M'] + params_set = set(M_dict.keys()) + selected_params = params_set + + # Attribute parameters to each row + # 1. Assign the parameter set from the first row first, so that + # columns are created + first_param_dict = select_config_M_dict(configs, 0, selected_params) + + # 2. Attribute parameter on an (simulation, subset, run) basis + df = _df.assign(**first_param_dict).copy() + for i, (_, subset_df) in enumerate(df.groupby(['simulation', 'subset', 'run'])): + df.loc[subset_df.index] = subset_df.assign(**select_config_M_dict(configs, + i, + selected_params)) + return df + + + + +SWEEP_PARAMS: Dict[str, List] = { + 'alpha': [1], + 'beta': [lambda x: 2 * x, lambda x: x], + 'gamma': [3, 4], + 'omega': [7] + } + +SINGLE_PARAMS: Dict[str, object] = { + 'alpha': 1, + 'beta': lambda x: x, + 'gamma': 3, + 'omega': 5 + } + + +def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict=SWEEP_PARAMS): + psu_steps = ['m1', 'm2', 'm3'] + system_substeps = len(psu_steps) + var_timestep_trigger = var_substep_trigger([0, system_substeps]) + env_timestep_trigger = env_trigger(system_substeps) + env_process = {} + + + # ['s1', 's2', 's3', 's4'] + # Policies per Mechanism + def gamma(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwargs): + return {'gamma': params['gamma']} + + + def omega(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwarg): + return {'omega': params['omega']} + + + # Internal States per Mechanism + def alpha(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'alpha_var', params['alpha'] + + + def beta(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'beta_var', params['beta'] + + def gamma_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'gamma_var', params['gamma'] + + def omega_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'omega_var', params['omega'] + + + def policies(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'policies', _input + + + def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'sweeped', {'beta': params['beta'], 'gamma': params['gamma']} + + psu_block: dict = {k: {"policies": {}, "states": {}} for k in psu_steps} + for m in psu_steps: + psu_block[m]['policies']['gamma'] = gamma + psu_block[m]['policies']['omega'] = omega + psu_block[m]["states"]['alpha_var'] = alpha + psu_block[m]["states"]['beta_var'] = beta + psu_block[m]["states"]['gamma_var'] = gamma_var + psu_block[m]["states"]['omega_var'] = omega_var + psu_block[m]['states']['policies'] = policies + psu_block[m]["states"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped) + + + # Genesis States + genesis_states = { + 'alpha_var': 0, + 'beta_var': 0, + 'gamma_var': 0, + 'omega_var': 0, + 'policies': {}, + 'sweeped': {} + } + + # Environment Process + env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']]) + + sim_config = config_sim( + { + "N": N_RUNS, + "T": range(N_TIMESTEPS), + "M": params, # Optional + } + ) + + # New Convention + partial_state_update_blocks = psub_list(psu_block, psu_steps) + + exp = Experiment() + exp.append_model( + sim_configs=sim_config, + initial_state=genesis_states, + env_processes=env_process, + partial_state_update_blocks=partial_state_update_blocks + ) + return exp + + +def test_deepcopy_off(): + exp = create_experiment() + mode = ExecutionMode().local_mode + exec_context = ExecutionContext(mode, additional_objs={'deepcopy_off': True}) + executor = Executor(exec_context=exec_context, configs=exp.configs) + (records, tensor_field, _) = executor.execute() + df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs)) + + # XXX: parameters should always be of the same type. Else, the test will fail + first_sim_config = exp.configs[0].sim_config['M'] + + + for (i, row) in df.iterrows(): + if row.timestep > 0: + + assert row['alpha_var'] == row['alpha'] + assert type(row['alpha_var']) == type(first_sim_config['alpha']) + assert row['gamma_var'] == row['gamma'] + assert type(row['gamma_var']) == type(first_sim_config['gamma']) + assert row['omega_var'] == row['omega'] + assert type(row['omega_var']) == type(first_sim_config['omega']) + diff --git a/testing/test_arg_count.py b/testing/test_arg_count.py new file mode 100644 index 00000000..e0ac71e6 --- /dev/null +++ b/testing/test_arg_count.py @@ -0,0 +1,51 @@ +from typing import Dict, List +from cadCAD.engine import Executor, ExecutionContext, ExecutionMode +from cadCAD.configuration import Experiment +from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list +from cadCAD.types import * +import pandas as pd # type: ignore +import types +import inspect +import pytest + +def test_sufs(): + + psubs = [ + { + 'policies': { + 'p_A': lambda _1, _2, _3, _4: {} + }, + 'variables': { + 'v_A': lambda _1, _2, _3, _4, _5: ('v_a', None) + } + } + ] + + initial_state = { + 'v_A': None + } + + params = {'p_A': [1]} + + N_t = 5 + N_r = 1 + + sim_config = config_sim( + { + "N": N_r, + "T": range(N_t), + "M": params, # Optional + } + ) + + exp = Experiment() + exp.append_model( + sim_configs=sim_config, + initial_state=initial_state, + partial_state_update_blocks=psubs + ) + + mode = ExecutionMode().local_mode + exec_context = ExecutionContext(mode, additional_objs={'deepcopy_off': True}) + executor = Executor(exec_context=exec_context, configs=exp.configs) + (records, tensor_field, _) = executor.execute()