Skip to content

Commit

Permalink
Refactor store_time_stats handler in ldmsd_controller
Browse files Browse the repository at this point in the history
  • Loading branch information
nichamon authored and tom95858 committed Dec 1, 2023
1 parent a7e49ca commit 6950cd3
Showing 1 changed file with 262 additions and 4 deletions.
266 changes: 262 additions & 4 deletions ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ from ldmsd import ldmsd_util
from ldmsd.ldmsd_communicator import LDMSD_Request, LDMSD_Req_Attr
from ldmsd.ldmsd_communicator import Communicator, fmt_status
import errno
import math

LDMSD_REQ_SOM_F=1
LDMSD_REQ_EOM_F=2
Expand Down Expand Up @@ -1250,6 +1251,204 @@ class LdmsdCmdParser(cmd.Cmd):
def complete_strgp_status(self, text, line, begidx, endidx):
return self.__complete_attr_list('strgp_status', text)

def __datatbl_new(self):
return {'set_name' : [],
'min' : math.inf,
'max' : 0,
'avg' : 0,
'count' : 0,
'start_ts' : None,
'end_ts' : None,
'min_member' : None,
'max_member' : None,
'min_avg_member' : None,
'max_avg_member' : None}

def __bounds(self, tbl):
min_v = math.inf
max_v = 0
min_avg = math.inf
max_avg = 0

for k in tbl.keys():
if k == 'stats':
continue
if min_v > tbl[k]['stats']['min']:
min_v = tbl[k]['stats']['min']
tbl['stats']['min_member'] = k
if max_v < tbl[k]['stats']['max']:
max_v = tbl[k]['stats']['max']
tbl['stats']['max_member'] = k
if min_avg > tbl[k]['stats']['avg']:
min_avg = tbl[k]['stats']['avg']
tbl['stats']['min_avg_member'] = k
if max_avg < tbl[k]['stats']['avg']:
max_avg = tbl[k]['stats']['avg']
tbl['stats']['max_avg_member'] = k

def __avg_update(self, cur_avg, cur_cnt, v, cnt):
new_cnt = cur_cnt + cnt
return (cur_avg * (cur_cnt / new_cnt) + v * (cnt / new_cnt), new_cnt)

def __min_max(self, tbl, data):
is_min = False
is_max = False
if tbl['min'] > data['min']:
tbl['min'] = data['min']
tbl['min_ts'] = data['min_ts']
is_min = True
if tbl['max'] < data['max']:
tbl['max'] = data['max']
tbl['max_ts'] = data['max_ts']
is_max = True
return (is_min, is_max)

def __datatbl_update(self, pset, strgp_tbl, schema_tbl, thread_tbl):
stgtbl = strgp_tbl['stats']
sstgtbl = strgp_tbl[pset['schema']]['stats']
tsstgtbl = strgp_tbl[pset['schema']][pset['thread_id']]['stats']

stbl = schema_tbl[pset['schema']]['stats']
ttbl = thread_tbl[pset['thread_id']]['stats']
tstbl = thread_tbl[pset['thread_id']][pset['schema']]['stats']

pset_avg = pset['stats']['avg']
pset_count = pset['stats']['count']

stgtbl['set_name'].append(pset['name'])
sstgtbl['set_name'].append(pset['name'])
tsstgtbl['set_name'].append(pset['name'])
stbl['set_name'].append(pset['name'])
ttbl['set_name'].append(pset['name'])
tstbl['set_name'].append(pset['name'])

stgtbl['avg'], stgtbl['count'] = self.__avg_update(stgtbl['avg'], stgtbl['count'],
pset_avg, pset_count)
sstgtbl['avg'], sstgtbl['count'] = self.__avg_update(sstgtbl['avg'], sstgtbl['count'],
pset_avg, pset_count)
tsstgtbl['avg'], tsstgtbl['count'] = self.__avg_update(tsstgtbl['avg'], tsstgtbl['count'],
pset_avg, pset_count)

stbl['avg'], stbl['count'] = self.__avg_update(stbl['avg'], stbl['count'],
pset_avg, pset_count)

ttbl['avg'], ttbl['count'] = self.__avg_update(ttbl['avg'], ttbl['count'],
pset_avg, pset_count)
tstbl['avg'], tstbl['count'] = self.__avg_update(tstbl['avg'], tstbl['count'],
pset_avg, pset_count)

self.__min_max(stgtbl, pset['stats'])
self.__min_max(sstgtbl, pset['stats'])
self.__min_max(tsstgtbl, pset['stats'])
self.__min_max(stbl, pset['stats'])
self.__min_max(ttbl, pset['stats'])
self.__min_max(tstbl, pset['stats'])

def __store_time_process(self, d):
# -----------------------------------------------
# SOURCE DICT
# -----------------------------------------------
# Assume that d = { <strgp_name> :{ 'sets': { <set_name> : { <data> }
# }
# }
# },
# where, <data> = { 'min' : <float>,
# 'max' : <float>,
# 'avg' : <float>,
# 'cnt' : <int>,
# 'start_ts' : <float>, # in seconds
# 'end_ts' : <float> # in seconds
# }
#
# -----------------------------------------------
# RESULT TABLES
# -----------------------------------------------
# stats = { 'min' : <float>,
# 'max' : <float>,
# 'avg' : <float>,
# 'cnt' : <int>,
# 'start' : <float> in seconds,
# 'end' : <float> in seconds,
# 'set_name' : [<set name, the order is the name as the lists above>]
# }
#
# schema_tbl = { 'stats': <stats>,
# <schema_name> : {'stats' : <stats>},
# ...
# }
schema_tbl = {'stats' : self.__datatbl_new()}
# thread_tbl = { 'stats' : <stats>,
# <thread_id> : { 'stats' : <stats>,
# <schema_name> : {'stats' : <stats> },
# ...
# }
# }
thread_tbl = {'stats' : self.__datatbl_new()}
#strgp_tbl = { 'stats' : <stats>,
# <strgp name> : { 'stats' : <stats>,
# <schema_name> : { 'stats' : <stats>,
# <thread_id> : { 'stats' : <stats>},
# ...
# },
# ...
# }
strgp_tbl = {'stats' : self.__datatbl_new()}

for strgp_name, strgp in d.items():
strgp_tbl[strgp_name] = {'stats' : self.__datatbl_new()}

for set_name, prdset in strgp['sets'].items():
prdset['name'] = set_name
schema_name = prdset['schema']
thread_id = prdset['thread_id']

if schema_name not in strgp_tbl[strgp_name]:
strgp_tbl[strgp_name][schema_name] = {'stats' : self.__datatbl_new()}
if thread_id not in strgp_tbl[strgp_name][schema_name].keys():
strgp_tbl[strgp_name][schema_name][thread_id] = {'stats' : self.__datatbl_new()}

if schema_name not in schema_tbl.keys():
schema_tbl[schema_name] = {'stats': self.__datatbl_new()}

if thread_id not in thread_tbl.keys():
thread_tbl[thread_id] = {'stats' : self.__datatbl_new()}
if schema_name not in thread_tbl[thread_id].keys():
thread_tbl[thread_id][schema_name] = {'stats': self.__datatbl_new()}

self.__datatbl_update(prdset, strgp_tbl[strgp_name], schema_tbl, thread_tbl)

self.__bounds(strgp_tbl)
for k in strgp_tbl.keys():
if k == 'stats':
continue
self.__bounds(strgp_tbl[k])
for tid in strgp_tbl[k].keys():
if tid == 'stats':
continue
self.__bounds(strgp_tbl[k][tid])

self.__bounds(schema_tbl)

self.__bounds(thread_tbl)
for k in thread_tbl.keys():
if k == 'stats':
continue
self.__bounds(thread_tbl[k])

return (strgp_tbl, schema_tbl, thread_tbl)

def __symbols(self, key, stats):
symbol = list()
if key == stats['min_member']:
symbol.append("<")
if key == stats['max_member']:
symbol.append(">")
if key == stats['min_avg_member']:
symbol.append("-")
if key == stats['max_avg_member']:
symbol.append("+")
return "".join(symbol)

def do_store_time_stats(self, arg):
"""
Get the store time statistics of a storage policy
Expand All @@ -1271,10 +1470,69 @@ class LdmsdCmdParser(cmd.Cmd):
# self.handle() already reported the error.
return
j = fmt_status(msg)
print(f"{'Storage Policy':25} {'Min(usec)':15} {'Max(usec)':15} {'Avg(usec)':15} {'Count':10} Number of Sets")
print(f"{'-'*25} {'-'*15} {'-'*15} {'-'*15} {'-'*10} {'-'*15}")
for n, strgp in j.items():
print(f"{n:25} {strgp['min']:15.4f} {strgp['max']:15.4f} {strgp['avg']:15.4f} {strgp['cnt']:10} {strgp['num_sets']:15}")

try:
strgp_tbl, schema_tbl, thread_tbl = self.__store_time_process(j)
except:
raise

# Schema Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f" < Minimum Value - Minimum Average value")
print(f" > Maximum Value + Maximum Average value")
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4} {'Schema':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
schema_names = sorted(list(schema_tbl.keys()))
for schema in schema_names:
if schema == 'stats':
continue
stats = schema_tbl[schema]['stats']
print(f"{self.__symbols(schema, schema_tbl['stats']):>4} {schema:>18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")

# Thread Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4}-{'Thread':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
# print(f"{'-'*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
threads = sorted(list(thread_tbl.keys()))
for tid in threads:
if tid == 'stats':
continue
symbol_tid = list()
stats = thread_tbl[tid]['stats']
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
print(f"{self.__symbols(tid, thread_tbl['stats']):>4} {tid:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")
schema_names = sorted([s for s in list(thread_tbl[tid].keys()) if s != 'stats'])
print(f"{' '*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
for schema in schema_names:
st = thread_tbl[tid][schema]['stats']
print(f" {self.__symbols(schema, thread_tbl[tid]['stats']):>4} {schema:>18} {st['min']:15.4f} {st['avg']:15.4f} {st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} {len(set(st['set_name'])):10} {stats['count']:10}")
# print(f"{'-'*(4+21+16+16+16+21+21+11+11)}")

# Storage policy Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4} {'Storage Policy':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
strgps = sorted(list(strgp_tbl.keys()))
for strgp in strgps:
if strgp == 'stats':
continue
stats = strgp_tbl[strgp]['stats']
print(f"{self.__symbols(strgp, strgp_tbl['stats']):>5} {strgp:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")
schemas = sorted([s for s in list(strgp_tbl[strgp].keys()) if s != 'stats'])
for schema in schemas:
print(f" {' '*2}{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
sch_stats = strgp_tbl[strgp][schema]['stats']
print(f" {self.__symbols(schema, strgp_tbl[strgp]['stats']):>5} {schema:16} {sch_stats['min']:15.4f} {sch_stats['avg']:15.4f} " \
f"{sch_stats['max']:15.4f} {sch_stats['min_ts']:20.4f} {sch_stats['max_ts']:20.4f} " \
f"{len(set(sch_stats['set_name'])):10} {stats['count']:10}")
threads = sorted([s for s in list(strgp_tbl[strgp][schema].keys()) if s != 'stats'])
for tid in threads:
st = strgp_tbl[strgp][schema][tid]['stats']
print(f" {self.__symbols(tid, strgp_tbl[strgp][schema]['stats']):>5}{tid:>13} {st['min']:15.4f} {st['avg']:15.4f} " \
f"{st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} " \
f"{len(set(st['set_name'])):10} {stats['count']:10}")
print(f"{' '*5} {'-'*18} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")

def complete_store_time_stats(self, text, line, begidx, endidx):
return self.__complete_attr_list('store_time_stats', text)
Expand Down

0 comments on commit 6950cd3

Please sign in to comment.