Skip to content

Commit

Permalink
Refactor the store_time_stats request handler
Browse files Browse the repository at this point in the history
  • Loading branch information
nichamon committed Nov 30, 2023
1 parent b20c3df commit 42f02d8
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 67 deletions.
38 changes: 38 additions & 0 deletions ldms/src/ldmsd/ldmsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,44 @@ int ldmsd_stop_sampler(char *plugin_name)
return rc;
}

/* a - b */
double ts_diff_usec(struct timespec *a, struct timespec *b)
{
double aa = a->tv_sec*1e9 + a->tv_nsec;
double bb = b->tv_sec*1e9 + b->tv_nsec;
return (aa - bb)/1e3; /* make it usec */
}

void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end)
{
if (start->tv_sec == 0) {
/*
* The counter and the start time got reset to zero, so
* the stat cannot be calculated this time.
*/
return;
}
double dur = ts_diff_usec(end, start);
stat->count++;
if (1 == stat->count) {
stat->avg = stat->min = stat->max = dur;
stat->min_ts.tv_sec = stat->max_ts.tv_sec = end->tv_sec;
stat->min_ts.tv_nsec = stat->max_ts.tv_nsec = end->tv_nsec;
} else {
stat->avg = (stat->avg * ((stat->count - 1.0)/stat->count)) + (dur/stat->count);
if (stat->min > dur) {
stat->min = dur;
stat->min_ts.tv_sec = end->tv_sec;
stat->min_ts.tv_nsec = end->tv_nsec;
} else if (stat->max < dur) {
stat->max = dur;
stat->max_ts.tv_sec = end->tv_sec;
stat->max_ts.tv_nsec = end->tv_nsec;
}
}
}


void *event_proc(void *v)
{
ovis_scheduler_t os = v;
Expand Down
5 changes: 5 additions & 0 deletions ldms/src/ldmsd/ldmsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ struct ldmsd_stat {
struct timespec start;
struct timespec end;
double min;
struct timespec min_ts;
double max;
struct timespec max_ts;
double avg;
int count;
};
Expand Down Expand Up @@ -325,6 +327,7 @@ typedef struct ldmsd_prdcr_set {
struct ldmsd_stat store_stat;
int skipped_upd_cnt;
int oversampled_cnt;
uint64_t zap_thread_id; /* A thread handling the update completion event. */

int ref_count;
struct timespec lookup_complete_ts;
Expand Down Expand Up @@ -1423,4 +1426,6 @@ size_t Snprintf(char **dst, size_t *len, char *fmt, ...);

__attribute__((format(printf, 2, 3)))
int linebuf_printf(struct ldmsd_req_ctxt *reqc, char *fmt, ...);

void ldmsd_stat_update(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end);
#endif
189 changes: 156 additions & 33 deletions ldms/src/ldmsd/ldmsd_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -8563,25 +8563,152 @@ static int update_time_stats_handler(ldmsd_req_ctxt_t reqc)
return rc;
}

static json_entity_t __ldmsd_stat2dict(struct ldmsd_stat *stat)
{
double start_ts = stat->start.tv_sec + stat->start.tv_nsec/1000000.0;
double end_ts = stat->end.tv_sec + stat->end.tv_nsec/1000000.0;
double min_ts = stat->min_ts.tv_sec + stat->min_ts.tv_nsec/1000000.0;
double max_ts = stat->max_ts.tv_sec + stat->max_ts.tv_nsec/1000000.0;
json_entity_t d = json_dict_build(NULL,
JSON_FLOAT_VALUE, "min", stat->min,
JSON_FLOAT_VALUE, "min_ts", min_ts,
JSON_FLOAT_VALUE, "max", stat->max,
JSON_FLOAT_VALUE, "max_ts", max_ts,
JSON_FLOAT_VALUE, "avg", stat->avg,
JSON_INT_VALUE, "count", stat->count,
JSON_FLOAT_VALUE, "start_ts", start_ts,
JSON_FLOAT_VALUE, "end_ts", end_ts,
-1);
return d;
}

static int
__store_time_stats_json_obj(ldmsd_req_ctxt_t reqc, ldmsd_strgp_t strgp, int reset)
__store_time_stats_strgp(json_entity_t strgp_dict, ldmsd_strgp_t strgp, int reset)
{
int rc;
int rc = 0;
ldmsd_prdcr_t prdcr;
ldmsd_prdcr_set_t prdset;
ldmsd_name_match_t match;
struct rbn *rbn;
pid_t tid;
char tid_s[128];
json_entity_t strgp_stats, set_stats ;
json_entity_t producers, threads, schemas, sets;
json_entity_t prdcr_json, thr_json, sch_json, set_json;

strgp_stats = json_dict_build(NULL,
JSON_DICT_VALUE, "producers", -2,
JSON_DICT_VALUE, "threads", -2,
JSON_DICT_VALUE, "schemas", -2,
JSON_DICT_VALUE, "sets", -2,
-1);
if (!strgp_stats) {
ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n");
rc = ENOMEM;
goto out;
}
producers = json_attr_value(json_attr_find(strgp_stats, "producers"));
threads = json_attr_value(json_attr_find(strgp_stats, "threads"));
schemas = json_attr_value(json_attr_find(strgp_stats, "schemas"));
sets = json_attr_value(json_attr_find(strgp_stats, "sets"));

rc = linebuf_printf(reqc, "\"%s\":{\"min\":%lf,"
"\"max\":%lf,"
"\"avg\":%lf,"
"\"cnt\":%d,"
"\"num_sets\":%d}",
strgp->obj.name,
strgp->stat.min,
strgp->stat.max,
strgp->stat.avg,
strgp->stat.count,
strgp->prdset_cnt);
if (reset)
memset(&strgp->stat, 0, sizeof(strgp->stat));
for (prdcr = ldmsd_prdcr_first(); prdcr; prdcr = ldmsd_prdcr_next(prdcr)) {
match = ldmsd_strgp_prdcr_first(strgp);
for (rc = 0; match; match = ldmsd_strgp_prdcr_next(match)) {
rc = regexec(&match->regex, prdcr->obj.name, 0, NULL, 0);
if (!rc)
break;
}
for (rbn = rbt_min(&prdcr->set_tree); rbn; rbn = rbn_succ(rbn)) {

prdset = container_of(rbn, struct ldmsd_prdcr_set, rbn);
if (strgp->schema) {
if (0 != strcmp(strgp->schema, prdset->schema_name))
continue;
} else {
rc = regexec(&strgp->schema_regex, prdset->schema_name, 0, NULL, 0);
if (rc)
continue;
}

prdcr_json = json_attr_find(producers, prdcr->obj.name);
if (!prdcr_json) {
/*
* The dictionary may be extended to contain
* producer's statistics in the future.
*/
prdcr_json = json_entity_new(JSON_DICT_VALUE);
if (!prdcr_json)
goto oom;
rc = json_attr_add(producers, prdcr->obj.name, prdcr_json);
if (rc)
goto json_error;
}

tid = ldms_set_thread_id_get(prdset->set);
snprintf(tid_s, 127, "%d", tid);
thr_json = json_attr_find(threads, tid_s);
if (!thr_json) {
/*
* The dictionary may be extended to contain
* thread's statistics in the future.
*/
thr_json = json_entity_new(JSON_DICT_VALUE);
if (!thr_json)
goto oom;
rc = json_attr_add(threads, tid_s, thr_json);
if (rc)
goto json_error;
}

sch_json = json_attr_find(schemas, prdset->schema_name);
if (!sch_json) {
/*
* The dictionary may be extended to contain
* schema's statistics in the future.
*/
sch_json = json_entity_new(JSON_DICT_VALUE);
if (!sch_json)
goto oom;
rc = json_attr_add(schemas, prdset->schema_name, sch_json);
if (rc)
goto json_error;
}

set_json = json_dict_build(NULL,
JSON_STRING_VALUE, "producer", prdcr->obj.name,
JSON_STRING_VALUE, "schema", prdset->schema_name,
JSON_STRING_VALUE, "thread_id", tid_s,
-1);
set_stats = __ldmsd_stat2dict(&prdset->store_stat);
if (!set_json || !set_stats)
goto oom;
rc = json_attr_add(set_json, "stats", set_stats);
if (rc)
goto json_error;
rc = json_attr_add(sets, prdset->inst_name, set_json);
if (rc)
goto json_error;
if (reset)
memset(&prdset->store_stat, 0, sizeof(prdset->store_stat));
}
}
rc = json_attr_add(strgp_dict, strgp->obj.name, strgp_stats);
if (rc)
goto json_error;
return 0;
free_stats:
json_entity_free(strgp_stats);
out:
return rc;
oom:
ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n");
rc = ENOMEM;
goto free_stats;
json_error:
ovis_log(config_log, OVIS_LERROR, "Error creating the response "
"of a store_time request. Error %d\n", rc);
goto free_stats;
}

static int store_time_stats_handler(ldmsd_req_ctxt_t reqc)
Expand All @@ -8590,8 +8717,8 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc)
char *name, *reset_s;
name = reset_s = NULL;
ldmsd_strgp_t strgp;
int cnt = 0;
int reset = 0;
json_entity_t strgp_dict;

reset_s = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_RESET);
if (reset_s) {
Expand All @@ -8600,9 +8727,12 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc)
free(reset_s);
}

rc = linebuf_printf(reqc, "{");
if (rc)
goto err;
strgp_dict = json_entity_new(JSON_DICT_VALUE);
if (!strgp_dict) {
ovis_log(config_log, OVIS_LCRIT, "Out of memory.\n");
rc = ENOMEM;
goto out;
}

name = ldmsd_req_attr_str_value_get_by_id(reqc, LDMSD_ATTR_NAME);
if (name) {
Expand All @@ -8615,35 +8745,27 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc)
ldmsd_send_req_response(reqc, reqc->line_buf);
return 0;
}
rc = __store_time_stats_json_obj(reqc, strgp, reset);
rc = __store_time_stats_strgp(strgp_dict, strgp, reset);
if (rc)
goto err;
} else {
ldmsd_cfg_lock(LDMSD_CFGOBJ_STRGP);
for (strgp = ldmsd_strgp_first(); strgp;
strgp = ldmsd_strgp_next(strgp)) {
if (cnt) {
rc = linebuf_printf(reqc, ",");
if (rc) {
ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP);
goto err;
}
}
ldmsd_strgp_lock(strgp);
rc = __store_time_stats_json_obj(reqc, strgp, reset);
rc = __store_time_stats_strgp(strgp_dict, strgp, reset);
if (rc) {
ldmsd_strgp_unlock(strgp);
ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP);
goto err;
}
ldmsd_strgp_unlock(strgp);
cnt++;
}
ldmsd_cfg_unlock(LDMSD_CFGOBJ_STRGP);
}

rc = linebuf_printf(reqc, "}");
if (rc)
goto err;
ldmsd_send_req_response(reqc, reqc->line_buf);
jbuf_t jbuf = json_entity_dump(NULL, strgp_dict);
ldmsd_send_req_response(reqc, jbuf->buf);
goto out;
err:
snprintf(reqc->line_buf, reqc->line_len, "Failed to query the store "
Expand All @@ -8652,5 +8774,6 @@ static int store_time_stats_handler(ldmsd_req_ctxt_t reqc)
ldmsd_send_req_response(reqc, reqc->line_buf);
out:
free(name);
json_entity_free(strgp_dict);
return rc;
}
40 changes: 6 additions & 34 deletions ldms/src/ldmsd/ldmsd_updtr.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,6 @@
#include "ldms_xprt.h"
#include "config.h"

/* a - b */
static inline double ts_diff_usec(struct timespec *a, struct timespec *b)
{
double aa = a->tv_sec*1e9 + a->tv_nsec;
double bb = b->tv_sec*1e9 + b->tv_nsec;
return (aa - bb)/1e3; /* make it usec */
}

/* Defined in ldmsd.c */
extern ovis_log_t updtr_log;

Expand Down Expand Up @@ -219,29 +211,6 @@ static void updtr_task_set_reset(ldmsd_updtr_task_t task)
task->set_count = 0;
}

static inline void
__stats(struct ldmsd_stat *stat, struct timespec *start, struct timespec *end)
{
if (start->tv_sec == 0) {
/*
* The counter and the start time got reset to zero, so
* the stat cannot be calculated this time.
*/
return;
}
double dur = ts_diff_usec(end, start);
stat->count++;
if (1 == stat->count) {
stat->avg = stat->min = stat->max = dur;
} else {
stat->avg = (stat->avg * ((stat->count - 1.0)/stat->count)) + (dur/stat->count);
if (stat->min > dur)
stat->min = dur;
else if (stat->max < dur)
stat->max = dur;
}
}

static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg)
{
uint64_t gn, push_it = 0;
Expand All @@ -252,7 +221,7 @@ static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg)

pthread_mutex_lock(&prd_set->lock);
clock_gettime(CLOCK_REALTIME, &prd_set->updt_stat.end);
__stats(&prd_set->updt_stat, &prd_set->updt_stat.start, &prd_set->updt_stat.end);
ldmsd_stat_update(&prd_set->updt_stat, &prd_set->updt_stat.start, &prd_set->updt_stat.end);

errcode = LDMS_UPD_ERROR(status);
ovis_log(updtr_log, OVIS_LDEBUG, "Update complete for Set %s with status %#x\n",
Expand Down Expand Up @@ -292,8 +261,8 @@ static void updtr_update_cb(ldms_t t, ldms_set_t set, int status, void *arg)
clock_gettime(CLOCK_REALTIME, &start);
strgp->update_fn(strgp, prd_set);
clock_gettime(CLOCK_REALTIME, &end);
__stats(&strgp->stat, &start, &end);
__stats(&prd_set->store_stat, &start, &end);
ldmsd_stat_update(&strgp->stat, &start, &end);
ldmsd_stat_update(&prd_set->store_stat, &start, &end);
ldmsd_strgp_unlock(strgp);
}
set_ready:
Expand Down Expand Up @@ -583,6 +552,7 @@ void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status,
if (__setgrp_members_lookup(prd_set))
goto out;
}
prd_set->zap_thread_id = ldms_set_thread_id_get(set);
prd_set->state = LDMSD_PRDCR_SET_STATE_READY;
ovis_log(updtr_log, OVIS_LINFO, "Set %s is ready\n", prd_set->inst_name);
ldmsd_strgp_update(prd_set);
Expand All @@ -595,6 +565,8 @@ void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status,
return;
}

/* Implemented in ldmsd.c */
extern double ts_diff_usec(struct timespec *a, struct timespec *b);
static void schedule_prdcr_updates(ldmsd_updtr_task_t task,
ldmsd_prdcr_t prdcr, ldmsd_name_match_t match)
{
Expand Down

0 comments on commit 42f02d8

Please sign in to comment.