From 6950cd3e235597444d7b94b24af31e8a6ac30a54 Mon Sep 17 00:00:00 2001 From: Nichamon Naksinehaboon Date: Mon, 6 Nov 2023 22:50:56 -0600 Subject: [PATCH] Refactor store_time_stats handler in ldmsd_controller --- ldms/python/ldmsd/ldmsd_controller | 266 ++++++++++++++++++++++++++++- 1 file changed, 262 insertions(+), 4 deletions(-) diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index a32989316..19c086e5d 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -66,6 +66,7 @@ from ldmsd import ldmsd_util from ldmsd.ldmsd_communicator import LDMSD_Request, LDMSD_Req_Attr from ldmsd.ldmsd_communicator import Communicator, fmt_status import errno +import math LDMSD_REQ_SOM_F=1 LDMSD_REQ_EOM_F=2 @@ -1250,6 +1251,204 @@ class LdmsdCmdParser(cmd.Cmd): def complete_strgp_status(self, text, line, begidx, endidx): return self.__complete_attr_list('strgp_status', text) + def __datatbl_new(self): + return {'set_name' : [], + 'min' : math.inf, + 'max' : 0, + 'avg' : 0, + 'count' : 0, + 'start_ts' : None, + 'end_ts' : None, + 'min_member' : None, + 'max_member' : None, + 'min_avg_member' : None, + 'max_avg_member' : None} + + def __bounds(self, tbl): + min_v = math.inf + max_v = 0 + min_avg = math.inf + max_avg = 0 + + for k in tbl.keys(): + if k == 'stats': + continue + if min_v > tbl[k]['stats']['min']: + min_v = tbl[k]['stats']['min'] + tbl['stats']['min_member'] = k + if max_v < tbl[k]['stats']['max']: + max_v = tbl[k]['stats']['max'] + tbl['stats']['max_member'] = k + if min_avg > tbl[k]['stats']['avg']: + min_avg = tbl[k]['stats']['avg'] + tbl['stats']['min_avg_member'] = k + if max_avg < tbl[k]['stats']['avg']: + max_avg = tbl[k]['stats']['avg'] + tbl['stats']['max_avg_member'] = k + + def __avg_update(self, cur_avg, cur_cnt, v, cnt): + new_cnt = cur_cnt + cnt + return (cur_avg * (cur_cnt / new_cnt) + v * (cnt / new_cnt), new_cnt) + + def __min_max(self, tbl, data): + is_min = False + is_max = False + if tbl['min'] > data['min']: + tbl['min'] = data['min'] + tbl['min_ts'] = data['min_ts'] + is_min = True + if tbl['max'] < data['max']: + tbl['max'] = data['max'] + tbl['max_ts'] = data['max_ts'] + is_max = True + return (is_min, is_max) + + def __datatbl_update(self, pset, strgp_tbl, schema_tbl, thread_tbl): + stgtbl = strgp_tbl['stats'] + sstgtbl = strgp_tbl[pset['schema']]['stats'] + tsstgtbl = strgp_tbl[pset['schema']][pset['thread_id']]['stats'] + + stbl = schema_tbl[pset['schema']]['stats'] + ttbl = thread_tbl[pset['thread_id']]['stats'] + tstbl = thread_tbl[pset['thread_id']][pset['schema']]['stats'] + + pset_avg = pset['stats']['avg'] + pset_count = pset['stats']['count'] + + stgtbl['set_name'].append(pset['name']) + sstgtbl['set_name'].append(pset['name']) + tsstgtbl['set_name'].append(pset['name']) + stbl['set_name'].append(pset['name']) + ttbl['set_name'].append(pset['name']) + tstbl['set_name'].append(pset['name']) + + stgtbl['avg'], stgtbl['count'] = self.__avg_update(stgtbl['avg'], stgtbl['count'], + pset_avg, pset_count) + sstgtbl['avg'], sstgtbl['count'] = self.__avg_update(sstgtbl['avg'], sstgtbl['count'], + pset_avg, pset_count) + tsstgtbl['avg'], tsstgtbl['count'] = self.__avg_update(tsstgtbl['avg'], tsstgtbl['count'], + pset_avg, pset_count) + + stbl['avg'], stbl['count'] = self.__avg_update(stbl['avg'], stbl['count'], + pset_avg, pset_count) + + ttbl['avg'], ttbl['count'] = self.__avg_update(ttbl['avg'], ttbl['count'], + pset_avg, pset_count) + tstbl['avg'], tstbl['count'] = self.__avg_update(tstbl['avg'], tstbl['count'], + pset_avg, pset_count) + + self.__min_max(stgtbl, pset['stats']) + self.__min_max(sstgtbl, pset['stats']) + self.__min_max(tsstgtbl, pset['stats']) + self.__min_max(stbl, pset['stats']) + self.__min_max(ttbl, pset['stats']) + self.__min_max(tstbl, pset['stats']) + + def __store_time_process(self, d): + # ----------------------------------------------- + # SOURCE DICT + # ----------------------------------------------- + # Assume that d = { :{ 'sets': { : { } + # } + # } + # }, + # where, = { 'min' : , + # 'max' : , + # 'avg' : , + # 'cnt' : , + # 'start_ts' : , # in seconds + # 'end_ts' : # in seconds + # } + # + # ----------------------------------------------- + # RESULT TABLES + # ----------------------------------------------- + # stats = { 'min' : , + # 'max' : , + # 'avg' : , + # 'cnt' : , + # 'start' : in seconds, + # 'end' : in seconds, + # 'set_name' : [] + # } + # + # schema_tbl = { 'stats': , + # : {'stats' : }, + # ... + # } + schema_tbl = {'stats' : self.__datatbl_new()} + # thread_tbl = { 'stats' : , + # : { 'stats' : , + # : {'stats' : }, + # ... + # } + # } + thread_tbl = {'stats' : self.__datatbl_new()} + #strgp_tbl = { 'stats' : , + # : { 'stats' : , + # : { 'stats' : , + # : { 'stats' : }, + # ... + # }, + # ... + # } + strgp_tbl = {'stats' : self.__datatbl_new()} + + for strgp_name, strgp in d.items(): + strgp_tbl[strgp_name] = {'stats' : self.__datatbl_new()} + + for set_name, prdset in strgp['sets'].items(): + prdset['name'] = set_name + schema_name = prdset['schema'] + thread_id = prdset['thread_id'] + + if schema_name not in strgp_tbl[strgp_name]: + strgp_tbl[strgp_name][schema_name] = {'stats' : self.__datatbl_new()} + if thread_id not in strgp_tbl[strgp_name][schema_name].keys(): + strgp_tbl[strgp_name][schema_name][thread_id] = {'stats' : self.__datatbl_new()} + + if schema_name not in schema_tbl.keys(): + schema_tbl[schema_name] = {'stats': self.__datatbl_new()} + + if thread_id not in thread_tbl.keys(): + thread_tbl[thread_id] = {'stats' : self.__datatbl_new()} + if schema_name not in thread_tbl[thread_id].keys(): + thread_tbl[thread_id][schema_name] = {'stats': self.__datatbl_new()} + + self.__datatbl_update(prdset, strgp_tbl[strgp_name], schema_tbl, thread_tbl) + + self.__bounds(strgp_tbl) + for k in strgp_tbl.keys(): + if k == 'stats': + continue + self.__bounds(strgp_tbl[k]) + for tid in strgp_tbl[k].keys(): + if tid == 'stats': + continue + self.__bounds(strgp_tbl[k][tid]) + + self.__bounds(schema_tbl) + + self.__bounds(thread_tbl) + for k in thread_tbl.keys(): + if k == 'stats': + continue + self.__bounds(thread_tbl[k]) + + return (strgp_tbl, schema_tbl, thread_tbl) + + def __symbols(self, key, stats): + symbol = list() + if key == stats['min_member']: + symbol.append("<") + if key == stats['max_member']: + symbol.append(">") + if key == stats['min_avg_member']: + symbol.append("-") + if key == stats['max_avg_member']: + symbol.append("+") + return "".join(symbol) + def do_store_time_stats(self, arg): """ Get the store time statistics of a storage policy @@ -1271,10 +1470,69 @@ class LdmsdCmdParser(cmd.Cmd): # self.handle() already reported the error. return j = fmt_status(msg) - print(f"{'Storage Policy':25} {'Min(usec)':15} {'Max(usec)':15} {'Avg(usec)':15} {'Count':10} Number of Sets") - print(f"{'-'*25} {'-'*15} {'-'*15} {'-'*15} {'-'*10} {'-'*15}") - for n, strgp in j.items(): - print(f"{n:25} {strgp['min']:15.4f} {strgp['max']:15.4f} {strgp['avg']:15.4f} {strgp['cnt']:10} {strgp['num_sets']:15}") + + try: + strgp_tbl, schema_tbl, thread_tbl = self.__store_time_process(j) + except: + raise + + # Schema Table + print(f"{'='*(4+21+16+16+16+21+21+11+11)}") + print(f" < Minimum Value - Minimum Average value") + print(f" > Maximum Value + Maximum Average value") + print(f"{'='*(4+21+16+16+16+21+21+11+11)}") + print(f"{' '*4} {'Schema':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}") + print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + schema_names = sorted(list(schema_tbl.keys())) + for schema in schema_names: + if schema == 'stats': + continue + stats = schema_tbl[schema]['stats'] + print(f"{self.__symbols(schema, schema_tbl['stats']):>4} {schema:>18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}") + + # Thread Table + print(f"{'='*(4+21+16+16+16+21+21+11+11)}") + print(f"{' '*4}-{'Thread':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}") + # print(f"{'-'*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + threads = sorted(list(thread_tbl.keys())) + for tid in threads: + if tid == 'stats': + continue + symbol_tid = list() + stats = thread_tbl[tid]['stats'] + print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + print(f"{self.__symbols(tid, thread_tbl['stats']):>4} {tid:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}") + schema_names = sorted([s for s in list(thread_tbl[tid].keys()) if s != 'stats']) + print(f"{' '*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + for schema in schema_names: + st = thread_tbl[tid][schema]['stats'] + print(f" {self.__symbols(schema, thread_tbl[tid]['stats']):>4} {schema:>18} {st['min']:15.4f} {st['avg']:15.4f} {st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} {len(set(st['set_name'])):10} {stats['count']:10}") + # print(f"{'-'*(4+21+16+16+16+21+21+11+11)}") + + # Storage policy Table + print(f"{'='*(4+21+16+16+16+21+21+11+11)}") + print(f"{' '*4} {'Storage Policy':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}") + print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + strgps = sorted(list(strgp_tbl.keys())) + for strgp in strgps: + if strgp == 'stats': + continue + stats = strgp_tbl[strgp]['stats'] + print(f"{self.__symbols(strgp, strgp_tbl['stats']):>5} {strgp:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}") + schemas = sorted([s for s in list(strgp_tbl[strgp].keys()) if s != 'stats']) + for schema in schemas: + print(f" {' '*2}{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") + sch_stats = strgp_tbl[strgp][schema]['stats'] + print(f" {self.__symbols(schema, strgp_tbl[strgp]['stats']):>5} {schema:16} {sch_stats['min']:15.4f} {sch_stats['avg']:15.4f} " \ + f"{sch_stats['max']:15.4f} {sch_stats['min_ts']:20.4f} {sch_stats['max_ts']:20.4f} " \ + f"{len(set(sch_stats['set_name'])):10} {stats['count']:10}") + threads = sorted([s for s in list(strgp_tbl[strgp][schema].keys()) if s != 'stats']) + for tid in threads: + st = strgp_tbl[strgp][schema][tid]['stats'] + print(f" {self.__symbols(tid, strgp_tbl[strgp][schema]['stats']):>5}{tid:>13} {st['min']:15.4f} {st['avg']:15.4f} " \ + f"{st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} " \ + f"{len(set(st['set_name'])):10} {stats['count']:10}") + print(f"{' '*5} {'-'*18} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}") def complete_store_time_stats(self, text, line, begidx, endidx): return self.__complete_attr_list('store_time_stats', text)