diff --git a/ldms/src/ldmsd/ldmsd.c b/ldms/src/ldmsd/ldmsd.c index b6ff419fae..b8af850822 100644 --- a/ldms/src/ldmsd/ldmsd.c +++ b/ldms/src/ldmsd/ldmsd.c @@ -1291,6 +1291,44 @@ int ldmsd_stop_sampler(char *plugin_name) return rc; } +/* a - b */ +double ts_diff_usec(struct timespec *a, struct timespec *b) +{ + double aa = a->tv_sec*1e9 + a->tv_nsec; + double bb = b->tv_sec*1e9 + b->tv_nsec; + return (aa - bb)/1e3; /* make it usec */ +} + +void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end) +{ + if (start->tv_sec == 0) { + /* + * The counter and the start time got reset to zero, so + * the stat cannot be calculated this time. + */ + return; + } + double dur = ts_diff_usec(end, start); + stat->count++; + if (1 == stat->count) { + stat->avg = stat->min = stat->max = dur; + stat->min_ts.tv_sec = stat->max_ts.tv_sec = end->tv_sec; + stat->min_ts.tv_nsec = stat->max_ts.tv_nsec = end->tv_nsec; + } else { + stat->avg = (stat->avg * ((stat->count - 1.0)/stat->count)) + (dur/stat->count); + if (stat->min > dur) { + stat->min = dur; + stat->min_ts.tv_sec = end->tv_sec; + stat->min_ts.tv_nsec = end->tv_nsec; + } else if (stat->max < dur) { + stat->max = dur; + stat->max_ts.tv_sec = end->tv_sec; + stat->max_ts.tv_nsec = end->tv_nsec; + } + } +} + + void *event_proc(void *v) { ovis_scheduler_t os = v; diff --git a/ldms/src/ldmsd/ldmsd.h b/ldms/src/ldmsd/ldmsd.h index b5685f387e..fc3adfdf87 100644 --- a/ldms/src/ldmsd/ldmsd.h +++ b/ldms/src/ldmsd/ldmsd.h @@ -289,7 +289,9 @@ struct ldmsd_stat { struct timespec start; struct timespec end; double min; + struct timespec min_ts; double max; + struct timespec max_ts; double avg; int count; }; @@ -325,6 +327,7 @@ typedef struct ldmsd_prdcr_set { struct ldmsd_stat store_stat; int skipped_upd_cnt; int oversampled_cnt; + uint64_t zap_thread_id; /* A thread handling the update completion event. */ int ref_count; struct timespec lookup_complete_ts; @@ -1423,4 +1426,6 @@ size_t Snprintf(char **dst, size_t *len, char *fmt, ...); __attribute__((format(printf, 2, 3))) int linebuf_printf(struct ldmsd_req_ctxt *reqc, char *fmt, ...); + +void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end); #endif diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index c3718c789c..1869d15d4c 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -8563,25 +8563,152 @@ static int update_time_stats_handler(ldmsd_req_ctxt_t reqc) return rc; } +static json_entity_t __ldmsd_stat2dict(struct ldmsd_stat *stat) +{ + double start_ts = stat->start.tv_sec + stat->start.tv_nsec/1000000.0; + double end_ts = stat->end.tv_sec + stat->end.tv_nsec/1000000.0; + double min_ts = stat->min_ts.tv_sec + stat->min_ts.tv_nsec/1000000.0; + double max_ts = stat->max_ts.tv_sec + stat->max_ts.tv_nsec/1000000.0; + json_entity_t d = json_dict_build(NULL, + JSON_FLOAT_VALUE, "min", stat->min, + JSON_FLOAT_VALUE, "min_ts", min_ts, + JSON_FLOAT_VALUE, "max", stat->max, + JSON_FLOAT_VALUE, "max_ts", max_ts, + JSON_FLOAT_VALUE, "avg", stat->avg, + JSON_INT_VALUE, "count", stat->count, + JSON_FLOAT_VALUE, "start_ts", start_ts, + JSON_FLOAT_VALUE, "end_ts", end_ts, + -1); + return d; +} + static int -__store_time_stats_json_obj(ldmsd_req_ctxt_t reqc, ldmsd_strgp_t strgp, int reset) +__store_time_stats_strgp(json_entity_t strgp_dict, ldmsd_strgp_t strgp, int reset) { - int rc; + int rc = 0; + ldmsd_prdcr_t prdcr; + ldmsd_prdcr_set_t prdset; + ldmsd_name_match_t match; + struct rbn *rbn; + pid_t tid; + char tid_s[128]; + json_entity_t strgp_stats, set_stats ; + json_entity_t producers, threads, schemas, sets; + json_entity_t prdcr_json, thr_json, sch_json, set_json; + + strgp_stats = json_dict_build(NULL, + JSON_DICT_VALUE, "producers", -2, + JSON_DICT_VALUE, "threads", -2, + JSON_DICT_VALUE, "schemas", -2, + JSON_DICT_VALUE, "sets", -2, + -1); + if (!strgp_stats) { + ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n"); + rc = ENOMEM; + goto out; + } + producers = json_attr_value(json_attr_find(strgp_stats, "producers")); + threads = json_attr_value(json_attr_find(strgp_stats, "threads")); + schemas = json_attr_value(json_attr_find(strgp_stats, "schemas")); + sets = json_attr_value(json_attr_find(strgp_stats, "sets")); - rc = linebuf_printf(reqc, "\"%s\":{\"min\":%lf," - "\"max\":%lf," - "\"avg\":%lf," - "\"cnt\":%d," - "\"num_sets\":%d}", - strgp->obj.name, - strgp->stat.min, - strgp->stat.max, - strgp->stat.avg, - strgp->stat.count, - strgp->prdset_cnt); - if (reset) - memset(&strgp->stat, 0, sizeof(strgp->stat)); + for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) { + match = ldmsd_strgp_prdcr_first(strgp); + for (rc = 0; match; match = ldmsd_strgp_prdcr_next(match)) { + rc = regexec(&match->regex, prdcr->obj.name, 0, NULL, 0); + if (!rc) + break; + } + for (rbn = rbt_min(&prdcr->set_tree); rbn; rbn = rbn_succ(rbn)) { + + prdset = container_of(rbn, struct ldmsd_prdcr_set, rbn); + if (strgp->schema) { + if (0 != strcmp(strgp->schema, prdset->schema_name)) + continue; + } else { + rc = regexec(&strgp->schema_regex, prdset->schema_name, 0, NULL, 0); + if (rc) + continue; + } + + prdcr_json = json_attr_find(producers, prdcr->obj.name); + if (!prdcr_json) { + /* + * The dictionary may be extended to contain + * producer's statistics in the future. + */ + prdcr_json = json_entity_new(JSON_DICT_VALUE); + if (!prdcr_json) + goto oom; + rc = json_attr_add(producers, prdcr->obj.name, prdcr_json); + if (rc) + goto json_error; + } + + tid = ldms_set_thread_id_get(prdset->set); + snprintf(tid_s, 127, "%d", tid); + thr_json = json_attr_find(threads, tid_s); + if (!thr_json) { + /* + * The dictionary may be extended to contain + * thread's statistics in the future. + */ + thr_json = json_entity_new(JSON_DICT_VALUE); + if (!thr_json) + goto oom; + rc = json_attr_add(threads, tid_s, thr_json); + if (rc) + goto json_error; + } + + sch_json = json_attr_find(schemas, prdset->schema_name); + if (!sch_json) { + /* + * The dictionary may be extended to contain + * schema's statistics in the future. + */ + sch_json = json_entity_new(JSON_DICT_VALUE); + if (!sch_json) + goto oom; + rc = json_attr_add(schemas, prdset->schema_name, sch_json); + if (rc) + goto json_error; + } + + set_json = json_dict_build(NULL, + JSON_STRING_VALUE, "producer", prdcr->obj.name, + JSON_STRING_VALUE, "schema", prdset->schema_name, + JSON_STRING_VALUE, "thread_id", tid_s, + -1); + set_stats = __ldmsd_stat2dict(&prdset->store_stat); + if (!set_json || !set_stats) + goto oom; + rc = json_attr_add(set_json, "stats", set_stats); + if (rc) + goto json_error; + rc = json_attr_add(sets, prdset->inst_name, set_json); + if (rc) + goto json_error; + if (reset) + memset(&prdset->store_stat, 0, sizeof(prdset->store_stat)); + } + } + rc = json_attr_add(strgp_dict, strgp->obj.name, strgp_stats); + if (rc) + goto json_error; + return 0; +free_stats: + json_entity_free(strgp_stats); +out: return rc; +oom: + ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n"); + rc = ENOMEM; + goto free_stats; +json_error: + ovis_log(config_log, OVIS_LERROR, "Error creating the response " + "of a store_time request. Error %d\n", rc); + goto free_stats; } static int store_time_stats_handler(ldmsd_req_ctxt_t reqc) @@ -8590,8 +8717,8 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc) char *name, *reset_s; name = reset_s = NULL; ldmsd_strgp_t strgp; - int cnt = 0; int reset = 0; + json_entity_t strgp_dict; reset_s = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RESET); if (reset_s) { @@ -8600,9 +8727,12 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc) free(reset_s); } - rc = linebuf_printf(reqc, "{"); - if (rc) - goto err; + strgp_dict = json_entity_new(JSON_DICT_VALUE); + if (!strgp_dict) { + ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n"); + rc = ENOMEM; + goto out; + } name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME); if (name) { @@ -8615,35 +8745,27 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc) ldmsd_send_req_response(reqc, reqc->line_buf); return 0; } - rc = __store_time_stats_json_obj(reqc, strgp, reset); + rc = __store_time_stats_strgp(strgp_dict, strgp, reset); + if (rc) + goto err; } else { ldmsd_cfg_lock(LDMSD_CFGOBJ_STRGP); for (strgp = ldmsd_strgp_first(); strgp; strgp = ldmsd_strgp_next(strgp)) { - if (cnt) { - rc = linebuf_printf(reqc, ","); - if (rc) { - ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); - goto err; - } - } ldmsd_strgp_lock(strgp); - rc = __store_time_stats_json_obj(reqc, strgp, reset); + rc = __store_time_stats_strgp(strgp_dict, strgp, reset); if (rc) { ldmsd_strgp_unlock(strgp); ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); goto err; } ldmsd_strgp_unlock(strgp); - cnt++; } ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP); } - rc = linebuf_printf(reqc, "}"); - if (rc) - goto err; - ldmsd_send_req_response(reqc, reqc->line_buf); + jbuf_t jbuf = json_entity_dump(NULL, strgp_dict); + ldmsd_send_req_response(reqc, jbuf->buf); goto out; err: snprintf(reqc->line_buf, reqc->line_len, "Failed to query the store " @@ -8652,5 +8774,6 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc) ldmsd_send_req_response(reqc, reqc->line_buf); out: free(name); + json_entity_free(strgp_dict); return rc; } diff --git a/ldms/src/ldmsd/ldmsd_updtr.c b/ldms/src/ldmsd/ldmsd_updtr.c index 23d692b80d..1da7f08cc0 100644 --- a/ldms/src/ldmsd/ldmsd_updtr.c +++ b/ldms/src/ldmsd/ldmsd_updtr.c @@ -60,14 +60,6 @@ #include "ldms_xprt.h" #include "config.h" -/* a - b */ -static inline double ts_diff_usec(struct timespec *a, struct timespec *b) -{ - double aa = a->tv_sec*1e9 + a->tv_nsec; - double bb = b->tv_sec*1e9 + b->tv_nsec; - return (aa - bb)/1e3; /* make it usec */ -} - /* Defined in ldmsd.c */ extern ovis_log_t updtr_log; @@ -219,29 +211,6 @@ static void updtr_task_set_reset(ldmsd_updtr_task_t task) task->set_count = 0; } -static inline void -__stats(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end) -{ - if (start->tv_sec == 0) { - /* - * The counter and the start time got reset to zero, so - * the stat cannot be calculated this time. - */ - return; - } - double dur = ts_diff_usec(end, start); - stat->count++; - if (1 == stat->count) { - stat->avg = stat->min = stat->max = dur; - } else { - stat->avg = (stat->avg * ((stat->count - 1.0)/stat->count)) + (dur/stat->count); - if (stat->min > dur) - stat->min = dur; - else if (stat->max < dur) - stat->max = dur; - } -} - static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg) { uint64_t gn, push_it = 0; @@ -252,7 +221,7 @@ static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg) pthread_mutex_lock(&prd_set->lock); clock_gettime(CLOCK_REALTIME, &prd_set->updt_stat.end); - __stats(&prd_set->updt_stat, &prd_set->updt_stat.start, &prd_set->updt_stat.end); + ldmsd_stat_update(&prd_set->updt_stat, &prd_set->updt_stat.start, &prd_set->updt_stat.end); errcode = LDMS_UPD_ERROR(status); ovis_log(updtr_log, OVIS_LDEBUG, "Update complete for Set %s with status %#x\n", @@ -292,8 +261,8 @@ static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg) clock_gettime(CLOCK_REALTIME, &start); strgp->update_fn(strgp, prd_set); clock_gettime(CLOCK_REALTIME, &end); - __stats(&strgp->stat, &start, &end); - __stats(&prd_set->store_stat, &start, &end); + ldmsd_stat_update(&strgp->stat, &start, &end); + ldmsd_stat_update(&prd_set->store_stat, &start, &end); ldmsd_strgp_unlock(strgp); } set_ready: @@ -583,6 +552,7 @@ void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, if (__setgrp_members_lookup(prd_set)) goto out; } + prd_set->zap_thread_id = ldms_set_thread_id_get(set); prd_set->state = LDMSD_PRDCR_SET_STATE_READY; ovis_log(updtr_log, OVIS_LINFO, "Set %s is ready\n", prd_set->inst_name); ldmsd_strgp_update(prd_set); @@ -595,6 +565,8 @@ void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, return; } +/* Implemented in ldmsd.c */ +extern double ts_diff_usec(struct timespec *a, struct timespec *b); static void schedule_prdcr_updates(ldmsd_updtr_task_t task, ldmsd_prdcr_t prdcr, ldmsd_name_match_t match) {