Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the result of store_time_stats #1302

Merged
merged 5 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 262 additions & 4 deletions ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ from ldmsd import ldmsd_util
from ldmsd.ldmsd_communicator import LDMSD_Request, LDMSD_Req_Attr
from ldmsd.ldmsd_communicator import Communicator, fmt_status
import errno
import math

LDMSD_REQ_SOM_F=1
LDMSD_REQ_EOM_F=2
Expand Down Expand Up @@ -1250,6 +1251,204 @@ class LdmsdCmdParser(cmd.Cmd):
def complete_strgp_status(self, text, line, begidx, endidx):
return self.__complete_attr_list('strgp_status', text)

def __datatbl_new(self):
return {'set_name' : [],
'min' : math.inf,
'max' : 0,
'avg' : 0,
'count' : 0,
'start_ts' : None,
'end_ts' : None,
'min_member' : None,
'max_member' : None,
'min_avg_member' : None,
'max_avg_member' : None}

def __bounds(self, tbl):
min_v = math.inf
max_v = 0
min_avg = math.inf
max_avg = 0

for k in tbl.keys():
if k == 'stats':
continue
if min_v > tbl[k]['stats']['min']:
min_v = tbl[k]['stats']['min']
tbl['stats']['min_member'] = k
if max_v < tbl[k]['stats']['max']:
max_v = tbl[k]['stats']['max']
tbl['stats']['max_member'] = k
if min_avg > tbl[k]['stats']['avg']:
min_avg = tbl[k]['stats']['avg']
tbl['stats']['min_avg_member'] = k
if max_avg < tbl[k]['stats']['avg']:
max_avg = tbl[k]['stats']['avg']
tbl['stats']['max_avg_member'] = k

def __avg_update(self, cur_avg, cur_cnt, v, cnt):
new_cnt = cur_cnt + cnt
return (cur_avg * (cur_cnt / new_cnt) + v * (cnt / new_cnt), new_cnt)

def __min_max(self, tbl, data):
is_min = False
is_max = False
if tbl['min'] > data['min']:
tbl['min'] = data['min']
tbl['min_ts'] = data['min_ts']
is_min = True
if tbl['max'] < data['max']:
tbl['max'] = data['max']
tbl['max_ts'] = data['max_ts']
is_max = True
return (is_min, is_max)

def __datatbl_update(self, pset, strgp_tbl, schema_tbl, thread_tbl):
stgtbl = strgp_tbl['stats']
sstgtbl = strgp_tbl[pset['schema']]['stats']
tsstgtbl = strgp_tbl[pset['schema']][pset['thread_id']]['stats']

stbl = schema_tbl[pset['schema']]['stats']
ttbl = thread_tbl[pset['thread_id']]['stats']
tstbl = thread_tbl[pset['thread_id']][pset['schema']]['stats']

pset_avg = pset['stats']['avg']
pset_count = pset['stats']['count']

stgtbl['set_name'].append(pset['name'])
sstgtbl['set_name'].append(pset['name'])
tsstgtbl['set_name'].append(pset['name'])
stbl['set_name'].append(pset['name'])
ttbl['set_name'].append(pset['name'])
tstbl['set_name'].append(pset['name'])

stgtbl['avg'], stgtbl['count'] = self.__avg_update(stgtbl['avg'], stgtbl['count'],
pset_avg, pset_count)
sstgtbl['avg'], sstgtbl['count'] = self.__avg_update(sstgtbl['avg'], sstgtbl['count'],
pset_avg, pset_count)
tsstgtbl['avg'], tsstgtbl['count'] = self.__avg_update(tsstgtbl['avg'], tsstgtbl['count'],
pset_avg, pset_count)

stbl['avg'], stbl['count'] = self.__avg_update(stbl['avg'], stbl['count'],
pset_avg, pset_count)

ttbl['avg'], ttbl['count'] = self.__avg_update(ttbl['avg'], ttbl['count'],
pset_avg, pset_count)
tstbl['avg'], tstbl['count'] = self.__avg_update(tstbl['avg'], tstbl['count'],
pset_avg, pset_count)

self.__min_max(stgtbl, pset['stats'])
self.__min_max(sstgtbl, pset['stats'])
self.__min_max(tsstgtbl, pset['stats'])
self.__min_max(stbl, pset['stats'])
self.__min_max(ttbl, pset['stats'])
self.__min_max(tstbl, pset['stats'])

def __store_time_process(self, d):
# -----------------------------------------------
# SOURCE DICT
# -----------------------------------------------
# Assume that d = { <strgp_name> :{ 'sets': { <set_name> : { <data> }
# }
# }
# },
# where, <data> = { 'min' : <float>,
# 'max' : <float>,
# 'avg' : <float>,
# 'cnt' : <int>,
# 'start_ts' : <float>, # in seconds
# 'end_ts' : <float> # in seconds
# }
#
# -----------------------------------------------
# RESULT TABLES
# -----------------------------------------------
# stats = { 'min' : <float>,
# 'max' : <float>,
# 'avg' : <float>,
# 'cnt' : <int>,
# 'start' : <float> in seconds,
# 'end' : <float> in seconds,
# 'set_name' : [<set name, the order is the name as the lists above>]
# }
#
# schema_tbl = { 'stats': <stats>,
# <schema_name> : {'stats' : <stats>},
# ...
# }
schema_tbl = {'stats' : self.__datatbl_new()}
# thread_tbl = { 'stats' : <stats>,
# <thread_id> : { 'stats' : <stats>,
# <schema_name> : {'stats' : <stats> },
# ...
# }
# }
thread_tbl = {'stats' : self.__datatbl_new()}
#strgp_tbl = { 'stats' : <stats>,
# <strgp name> : { 'stats' : <stats>,
# <schema_name> : { 'stats' : <stats>,
# <thread_id> : { 'stats' : <stats>},
# ...
# },
# ...
# }
strgp_tbl = {'stats' : self.__datatbl_new()}

for strgp_name, strgp in d.items():
strgp_tbl[strgp_name] = {'stats' : self.__datatbl_new()}

for set_name, prdset in strgp['sets'].items():
prdset['name'] = set_name
schema_name = prdset['schema']
thread_id = prdset['thread_id']

if schema_name not in strgp_tbl[strgp_name]:
strgp_tbl[strgp_name][schema_name] = {'stats' : self.__datatbl_new()}
if thread_id not in strgp_tbl[strgp_name][schema_name].keys():
strgp_tbl[strgp_name][schema_name][thread_id] = {'stats' : self.__datatbl_new()}

if schema_name not in schema_tbl.keys():
schema_tbl[schema_name] = {'stats': self.__datatbl_new()}

if thread_id not in thread_tbl.keys():
thread_tbl[thread_id] = {'stats' : self.__datatbl_new()}
if schema_name not in thread_tbl[thread_id].keys():
thread_tbl[thread_id][schema_name] = {'stats': self.__datatbl_new()}

self.__datatbl_update(prdset, strgp_tbl[strgp_name], schema_tbl, thread_tbl)

self.__bounds(strgp_tbl)
for k in strgp_tbl.keys():
if k == 'stats':
continue
self.__bounds(strgp_tbl[k])
for tid in strgp_tbl[k].keys():
if tid == 'stats':
continue
self.__bounds(strgp_tbl[k][tid])

self.__bounds(schema_tbl)

self.__bounds(thread_tbl)
for k in thread_tbl.keys():
if k == 'stats':
continue
self.__bounds(thread_tbl[k])

return (strgp_tbl, schema_tbl, thread_tbl)

def __symbols(self, key, stats):
symbol = list()
if key == stats['min_member']:
symbol.append("<")
if key == stats['max_member']:
symbol.append(">")
if key == stats['min_avg_member']:
symbol.append("-")
if key == stats['max_avg_member']:
symbol.append("+")
return "".join(symbol)

def do_store_time_stats(self, arg):
"""
Get the store time statistics of a storage policy
Expand All @@ -1271,10 +1470,69 @@ class LdmsdCmdParser(cmd.Cmd):
# self.handle() already reported the error.
return
j = fmt_status(msg)
print(f"{'Storage Policy':25} {'Min(usec)':15} {'Max(usec)':15} {'Avg(usec)':15} {'Count':10} Number of Sets")
print(f"{'-'*25} {'-'*15} {'-'*15} {'-'*15} {'-'*10} {'-'*15}")
for n, strgp in j.items():
print(f"{n:25} {strgp['min']:15.4f} {strgp['max']:15.4f} {strgp['avg']:15.4f} {strgp['cnt']:10} {strgp['num_sets']:15}")

try:
strgp_tbl, schema_tbl, thread_tbl = self.__store_time_process(j)
except:
raise

# Schema Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f" < Minimum Value - Minimum Average value")
print(f" > Maximum Value + Maximum Average value")
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4} {'Schema':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
schema_names = sorted(list(schema_tbl.keys()))
for schema in schema_names:
if schema == 'stats':
continue
stats = schema_tbl[schema]['stats']
print(f"{self.__symbols(schema, schema_tbl['stats']):>4} {schema:>18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")

# Thread Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4}-{'Thread':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
# print(f"{'-'*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
threads = sorted(list(thread_tbl.keys()))
for tid in threads:
if tid == 'stats':
continue
symbol_tid = list()
stats = thread_tbl[tid]['stats']
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
print(f"{self.__symbols(tid, thread_tbl['stats']):>4} {tid:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")
schema_names = sorted([s for s in list(thread_tbl[tid].keys()) if s != 'stats'])
print(f"{' '*4} {'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
for schema in schema_names:
st = thread_tbl[tid][schema]['stats']
print(f" {self.__symbols(schema, thread_tbl[tid]['stats']):>4} {schema:>18} {st['min']:15.4f} {st['avg']:15.4f} {st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} {len(set(st['set_name'])):10} {stats['count']:10}")
# print(f"{'-'*(4+21+16+16+16+21+21+11+11)}")

# Storage policy Table
print(f"{'='*(4+21+16+16+16+21+21+11+11)}")
print(f"{' '*4} {'Storage Policy':^20} {'Min (sec)':^15} {'Avg (sec)':^15} {'Max (sec)':^15} {'Min Timestamp':^20} {'Max Timestamp':^20} {'# of Sets':^10} {'Count':^10}")
print(f"{'-'*4}-{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
strgps = sorted(list(strgp_tbl.keys()))
for strgp in strgps:
if strgp == 'stats':
continue
stats = strgp_tbl[strgp]['stats']
print(f"{self.__symbols(strgp, strgp_tbl['stats']):>5} {strgp:18} {stats['min']:15.4f} {stats['avg']:15.4f} {stats['max']:15.4f} {stats['min_ts']:20.4f} {stats['max_ts']:20.4f} {len(set(stats['set_name'])):10} {stats['count']:10}")
schemas = sorted([s for s in list(strgp_tbl[strgp].keys()) if s != 'stats'])
for schema in schemas:
print(f" {' '*2}{'-'*20} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")
sch_stats = strgp_tbl[strgp][schema]['stats']
print(f" {self.__symbols(schema, strgp_tbl[strgp]['stats']):>5} {schema:16} {sch_stats['min']:15.4f} {sch_stats['avg']:15.4f} " \
f"{sch_stats['max']:15.4f} {sch_stats['min_ts']:20.4f} {sch_stats['max_ts']:20.4f} " \
f"{len(set(sch_stats['set_name'])):10} {stats['count']:10}")
threads = sorted([s for s in list(strgp_tbl[strgp][schema].keys()) if s != 'stats'])
for tid in threads:
st = strgp_tbl[strgp][schema][tid]['stats']
print(f" {self.__symbols(tid, strgp_tbl[strgp][schema]['stats']):>5}{tid:>13} {st['min']:15.4f} {st['avg']:15.4f} " \
f"{st['max']:15.4f} {st['min_ts']:20.4f} {st['max_ts']:20.4f} " \
f"{len(set(st['set_name'])):10} {stats['count']:10}")
print(f"{' '*5} {'-'*18} {'-'*15} {'-'*15} {'-'*15} {'-'*20} {'-'*20} {'-'*10} {'-'*10}")

def complete_store_time_stats(self, text, line, begidx, endidx):
return self.__complete_attr_list('store_time_stats', text)
Expand Down
10 changes: 10 additions & 0 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,6 +3694,16 @@ int ldms_set_info_traverse(ldms_set_t s, ldms_set_info_traverse_cb_fn cb,
return rc;
}

pid_t ldms_set_thread_id_get(ldms_set_t s)
{
struct ldms_xprt *x;

if (!s->xprt)
return -1;
x = (struct ldms_xprt *)s->xprt;
return zap_ep_thread_id(x->zap_ep);
}

void ldms_version_get(struct ldms_version *v)
{
LDMS_VERSION_SET(*v);
Expand Down
9 changes: 9 additions & 0 deletions ldms/src/core/ldms.h
Original file line number Diff line number Diff line change
Expand Up @@ -2633,6 +2633,15 @@ typedef int (*ldms_set_info_traverse_cb_fn)(const char *key, const char *value,
extern int ldms_set_info_traverse(ldms_set_t s, ldms_set_info_traverse_cb_fn cb,
int flag, void *cb_arg);

/**
* \brief Return the thread ID handles the lookup and update complete events of the set
*
* \param s The set handle
*
* \return The thread ID. -1 is returned if the set is not created by lookup.
*/
extern pid_t ldms_set_thread_id_get(ldms_set_t s);

/**
* \brief Add a metric to schema
*
Expand Down
38 changes: 38 additions & 0 deletions ldms/src/ldmsd/ldmsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,44 @@ int ldmsd_stop_sampler(char *plugin_name)
return rc;
}

/* a - b */
double ts_diff_usec(struct timespec *a, struct timespec *b)
{
double aa = a->tv_sec*1e9 + a->tv_nsec;
double bb = b->tv_sec*1e9 + b->tv_nsec;
return (aa - bb)/1e3; /* make it usec */
}

void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end)
{
if (start->tv_sec == 0) {
/*
* The counter and the start time got reset to zero, so
* the stat cannot be calculated this time.
*/
return;
}
double dur = ts_diff_usec(end, start);
stat->count++;
if (1 == stat->count) {
stat->avg = stat->min = stat->max = dur;
stat->min_ts.tv_sec = stat->max_ts.tv_sec = end->tv_sec;
stat->min_ts.tv_nsec = stat->max_ts.tv_nsec = end->tv_nsec;
} else {
stat->avg = (stat->avg * ((stat->count - 1.0)/stat->count)) + (dur/stat->count);
if (stat->min > dur) {
stat->min = dur;
stat->min_ts.tv_sec = end->tv_sec;
stat->min_ts.tv_nsec = end->tv_nsec;
} else if (stat->max < dur) {
stat->max = dur;
stat->max_ts.tv_sec = end->tv_sec;
stat->max_ts.tv_nsec = end->tv_nsec;
}
}
}


void *event_proc(void *v)
{
ovis_scheduler_t os = v;
Expand Down
6 changes: 6 additions & 0 deletions ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ struct ldmsd_stat {
struct timespec start;
struct timespec end;
double min;
struct timespec min_ts;
double max;
struct timespec max_ts;
double avg;
int count;
};
Expand Down Expand Up @@ -322,8 +324,10 @@ typedef struct ldmsd_prdcr_set {
uint8_t updt_sync;

struct ldmsd_stat updt_stat;
struct ldmsd_stat store_stat;
int skipped_upd_cnt;
int oversampled_cnt;
uint64_t zap_thread_id; /* A thread handling the update completion event. */

int ref_count;
struct timespec lookup_complete_ts;
Expand Down Expand Up @@ -1422,4 +1426,6 @@ size_t Snprintf(char **dst, size_t *len, char *fmt, ...);

__attribute__((format(printf, 2, 3)))
int linebuf_printf(struct ldmsd_req_ctxt *reqc, char *fmt, ...);

void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end);
#endif
Loading