diff --git a/ldms/scripts/examples/linux_proc_sampler b/ldms/scripts/examples/linux_proc_sampler index 2a13424bd..b5e1208c4 100644 --- a/ldms/scripts/examples/linux_proc_sampler +++ b/ldms/scripts/examples/linux_proc_sampler @@ -67,12 +67,15 @@ cat << EOF > $LDMSD_RUN/metrics.input } EOF rm -f $LOGDIR/json*.log -#valgrind -v --tool=drd --log-file=$LOGDIR/vg.netlink.txt ${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & -#valgrind -v --leak-check=full --track-origins=yes --trace-children=yes --log-file=$LOGDIR/vg.netlink.txt ${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & -${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked -x -e exec,clone,exit & +drd="valgrind -v --tool=drd --log-file=$LOGDIR/vg.netlink.drd.txt --trace-cond=yes --trace-fork-join=yes" +memcheck="valgrind -v --leak-check=full --track-origins=yes --trace-children=yes --log-file=$LOGDIR/vg.netlink.memcheck.txt --keep-debuginfo=yes --malloc-fill=3b" +#${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked & + +${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked -x -e exec,clone,exit -L $LOGDIR/nl.log --heartbeat 1 -v 0 & + # uncomment next one to test duplicate handling #${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json2.log --exclude-dir-path= --exclude-short-path= --exclude-programs & -VGARGS="--tool=drd --suppressions=/scratch1/baallan/ovis/ldms/scripts/examples/linux_proc_sampler.drd.supp" +VGARGS="--tool=drd --trace-cond=yes --trace-fork-join=yes" VGARGS="--leak-check=full --track-origins=yes --trace-children=yes --show-leak-kinds=definite --time-stamp=yes --keep-debuginfo=yes --malloc-fill=3b" #vgon LDMSD 1 diff --git a/ldms/src/sampler/netlink/netlink-notifier.c b/ldms/src/sampler/netlink/netlink-notifier.c index 6a661b214..036e0e380 100644 --- a/ldms/src/sampler/netlink/netlink-notifier.c +++ b/ldms/src/sampler/netlink/netlink-notifier.c @@ -230,6 +230,7 @@ typedef struct proc_info { bool excluded; /* true if matches excluded path lists or uid bound */ bool excluded_short; /* true if matches excluded_short path list */ int emitted; /* values from EMIT_* below */ + double duration; /* -1 if unknown or seconds pid endured (only valid at end) */ } proc_info_t; #define EMIT_NONE 0x0 @@ -320,6 +321,9 @@ typedef struct forkstat { double opt_wake_interval; char *compid_field; /* compid field formatted, if requested */ char *prod_field; /* ProducerName field formatted, if requested */ + unsigned long format; /* stream formatting version to use */ + int heartbeat; /* approximate seconds between heartbeats */ + time_t lastbeat; /* time of last beat */ } forkstat_t; @@ -405,6 +409,8 @@ static const int signals[] = { }; /* add several structures related to filtering. */ +#define default_format "1" /* this is also the max format known */ +#define default_heartbeat NULL /* none by default*/ #define default_component_id NULL #define default_ProducerName NULL #define default_track_dir "/var/run/ldms-netlink-tracked" @@ -2105,8 +2111,10 @@ static void *monitor(void *vp) d1 = timeval_to_double(&info1->start); d2 = timeval_to_double(&tv); (void)snprintf(duration, sizeof(duration), "%8s", secs_to_str(d2 - d1)); + info1->duration = d2 - d1; } else { (void)snprintf(duration, sizeof(duration), "unknown"); + info1->duration = -1.0; } if (ft->opt_flags & (OPT_UMIN | OPT_EXTRA)) { if (info1->uid < ft->opt_uidmin) { @@ -2350,6 +2358,8 @@ static struct exclude_arg excludes[] = { {default_track_dir, VT_DIR, 0, "NOTIFIER_TRACK_DIR", NULL, 0, PLINIT}, {default_ProducerName, VT_SCALAR, 0, "NOTIFIER_PRODUCERNAME", NULL, 0, PLINIT}, {default_component_id, VT_SCALAR, 0, "NOTIFIER_COMPONENT_ID", NULL, 0, PLINIT}, + {default_format, VT_SCALAR, 0, "NOTIFIER_FORMAT", NULL, 0, PLINIT}, + {default_heartbeat, VT_SCALAR, 0, "NOTIFIER_HEARTBEAT", NULL, 0, PLINIT}, }; static struct exclude_arg *bin_exclude = &excludes[0]; static struct exclude_arg *dir_exclude = &excludes[1]; @@ -2366,6 +2376,8 @@ static struct exclude_arg *send_log_arg = &excludes[11]; static struct exclude_arg *track_dir_arg = &excludes[12]; static struct exclude_arg *prod_arg = &excludes[13]; static struct exclude_arg *compid_arg = &excludes[14]; +static struct exclude_arg *format_arg = &excludes[15]; +static struct exclude_arg *heartbeat_arg = &excludes[16]; static struct option long_options[] = { {"exclude-programs", optional_argument, 0, 0}, @@ -2383,6 +2395,8 @@ static struct option long_options[] = { {"track-dir", required_argument, 0, 0}, {"ProducerName", required_argument, 0, 0}, {"component_id", required_argument, 0, 0}, + {"format", required_argument, 0, 0}, + {"heartbeat", required_argument, 0, 0}, {0, 0, 0, 0} }; @@ -2539,6 +2553,41 @@ static int set_interval(char *optarg, forkstat_t *ft) return 0; } +static int set_heartbeat(char *optarg, forkstat_t *ft) +{ + if (!optarg || !ft) + return EINVAL; + char *end = NULL; + ft->heartbeat = strtol(optarg, &end, 10); + if (ft->heartbeat < 0) { + PRINTF("Illegal --heartbeat: %s, must be positive\n", optarg); + return EINVAL; + } + if (*end != '\0') { + PRINTF("--heartbeat needed, not %s\n", optarg); + return EINVAL; + } + return 0; +} + +static int set_format(char *optarg, forkstat_t *ft) +{ + if (!optarg || !ft) + return EINVAL; + char *end = NULL; + unsigned long max_format = strtoul(default_format, &end, 10); + ft->format = strtoul(optarg, &end, 10); + if (ft->format > max_format) { + PRINTF("Illegal --format: %s, must be <= %s\n", optarg, default_format); + return EINVAL; + } + if (*end != '\0') { + PRINTF("--format needed, not %s\n", optarg); + return EINVAL; + } + return 0; +} + static int set_duration_min(char *optarg, forkstat_t *ft) { if (!optarg || !ft) @@ -3029,6 +3078,7 @@ static int add_env_attr(struct env_attr *a, jbuf_t *jb, const struct proc_info * return 0; } + static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_info *info, #if DEBUG_EMITTER const char *type, @@ -3087,42 +3137,78 @@ static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_in } + static jbuf_t make_process_end_data_linux(forkstat_t *ft, const struct proc_info *info, jbuf_t jbd) { jbuf_t jb = jbd; - (void)ft; if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"linux_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"start\":\"%lu.%06lu\"," - /* format start_tick as string because u64 - * is out of ovis_json signed int range */ - "\"start_tick\":\"%" PRIu64 "\"," - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 "," - "\"task_pid\":%d" - "}" - "}", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info->start.tv_sec, info->start.tv_usec, - info->start_tick, - info_jobid_str(info), - info->serno, - (int64_t)info->pid, - (int)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"linux_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + /* format start_tick as string because u64 + * is out of ovis_json signed int range */ + "\"start_tick\":\"%" PRIu64 "\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"task_pid\":%d" + "}" + "}", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info->start_tick, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + (int)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"linux_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + /* format start_tick as string because u64 + * is out of ovis_json signed int range */ + "\"start_tick\":\"%" PRIu64 "\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"task_pid\":%d," + "\"duration\":%.17g" + "}" + "}", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info->start_tick, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + (int)info->pid, + info->duration); + else + jb = NULL; return jb; } @@ -3183,27 +3269,54 @@ static jbuf_t make_process_end_data_lsf(forkstat_t *ft, const struct proc_info * jbuf_t jb = jbd; if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"lsf_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"start\":\"%lu.%06lu\"," - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 ",", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info->start.tv_sec, info->start.tv_usec, - info_jobid_str(info), - info->serno, - (int64_t)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"lsf_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 ",", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info_jobid_str(info), + info->serno, + (int64_t)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"lsf_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"start\":\"%lu.%06lu\"," + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"duration\":%.17g,", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info->start.tv_sec, info->start.tv_usec, + info_jobid_str(info), + info->serno, + (int64_t)info->pid, + info->duration); + else + jb = NULL; if (!jb) goto out_1; size_t i, iend; @@ -3285,25 +3398,52 @@ static jbuf_t make_process_end_data_slurm(forkstat_t *ft, const struct proc_info if (!jb) return NULL; - jb = jbuf_append_str(jb, - "{" - "\"msgno\":%" PRIu64 "," - "\"schema\":\"slurm_task_data\"," - "\"event\":\"task_exit\"," - "\"timestamp\":%d," - "\"context\":\"*\"," - "\"data\":" + if (ft->format == 0) + jb = jbuf_append_str(jb, "{" - "%s%s" - "\"job_id\":\"%s\"," - "\"serial\":%" PRId64 "," - "\"os_pid\":%" PRId64 ",", - forkstat_get_serial(ft), - time(NULL), - ft->prod_field, ft->compid_field, - info_jobid_str(info), - info->serno, - (int64_t)info->pid); + "\"msgno\":%" PRIu64 "," + "\"schema\":\"slurm_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"job_id\":\"%s\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 ",", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info_jobid_str(info), + info->serno, + (int64_t)info->pid); + else if (ft->format == 1) + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"slurm_task_data\"," + "\"event\":\"task_exit\"," + "\"timestamp\":%d," + "\"context\":\"*\"," + "\"data\":" + "{" + "%s%s" + "\"job_id\":\"%s\"," + "\"start\":\"%lu.%06lu\"," + "\"serial\":%" PRId64 "," + "\"os_pid\":%" PRId64 "," + "\"duration\":%.17g,", + forkstat_get_serial(ft), + time(NULL), + ft->prod_field, ft->compid_field, + info_jobid_str(info), + info->start.tv_sec, info->start.tv_usec, + info->serno, + (int64_t)info->pid, + info->duration); + else + jb = NULL; if (!jb) goto out_1; int i, iend; @@ -3546,6 +3686,33 @@ static int forkstat_set_json_log(forkstat_t *ft, const char *fname) static forkstat_t *shft; +void heartbeat(forkstat_t *ft, time_t now, jbuf_t *jbd) +{ + if (!ft || !ft->heartbeat || !jbd || !*jbd) + return; + if (now - ft->lastbeat >= ft->heartbeat) { + ft->lastbeat = now; + jbuf_t jb = *jbd; + /* format similar to others without data object */ + jbuf_reset(jb); + jb = jbuf_append_str(jb, + "{" + "\"msgno\":%" PRIu64 "," + "\"schema\":\"ldms-notify-status\"," + "%s%s" + "\"timestamp\":%" PRId64 "," + "\"event\":\"heartbeat\",\"data\":{}}", + forkstat_get_serial(ft), + ft->prod_field, ft->compid_field, + (int64_t)ft->lastbeat + ); + if (jb) { + send_ldms_message(ft, jb); + } + *jbd = jb; + } +} + #define DFLT_SORT_SZ 128 static void *dump_pids(void *vp) { @@ -3608,6 +3775,7 @@ static void *dump_pids(void *vp) if (ft->opt_trace) PRINTF("end UPDATE\n"); + heartbeat(ft, tv.tv_sec, &jb); nanosleep(&s, NULL); } jbuf_free(jb); @@ -3788,6 +3956,12 @@ int main(int argc, char * argv[]) } normalize_exclude(&excludes[c]); } + if (set_format(format_arg->paths[0].n, ft)) { + fprintf(stderr, "Bad value %s for %s (>=0 & <= %s).\n", + format_arg->paths[0].n, "format", format_arg[0].defval); + ret = EXIT_FAILURE; + goto abort_sock; + } if (compid_arg[0].parsed && compid_arg->paths[0].n) { size_t csz = strlen(compid_arg->paths[0].n) + 20; char ctmp[csz]; @@ -3796,6 +3970,13 @@ int main(int argc, char * argv[]) } else { ft->compid_field = strdup(""); } + if (heartbeat_arg[0].parsed && heartbeat_arg->paths[0].n && + set_heartbeat(heartbeat_arg->paths[0].n, ft)) { // NPR fixme + fprintf(stderr, "Bad value %s for %s.\n", + heartbeat_arg->paths[0].n, "heartbeat. need seconds."); + ret = EXIT_FAILURE; + goto abort_sock; + } if (prod_arg[0].parsed && prod_arg->paths[0].n) { size_t csz = strlen(prod_arg->paths[0].n) + 20; char ctmp[csz]; @@ -3813,6 +3994,7 @@ int main(int argc, char * argv[]) fprintf(stderr, "Bad value %s for %s or %s.\n", duration_exclude->paths[0].n, long_options[3].name, duration_exclude->env); ret = EXIT_FAILURE; + goto abort_sock; } if (ft->opt_trace) diff --git a/ldms/src/sampler/netlink/netlink-notifier.man b/ldms/src/sampler/netlink/netlink-notifier.man index 4a54b784e..088f269b4 100644 --- a/ldms/src/sampler/netlink/netlink-notifier.man +++ b/ldms/src/sampler/netlink/netlink-notifier.man @@ -90,6 +90,8 @@ The 'short' options do not override the exclude entirely options. If not set, the component_id field is not included in the stream formats produced. --ProducerName= set the value of ProducerName If not set, the ProducerName field is not included in the stream formats produced. +--format=N change the format of messages to version N. + If not set, the highest available format is used. See MESSAGE FORMATS. .fi .SH ENVIRONMENT @@ -107,6 +109,8 @@ NOTIFIER_LDMS_XPRT=sock NOTIFIER_LDMS_HOST=localhost NOTIFIER_LDMS_PORT=411 NOTIFIER_LDMS_AUTH=munge +NOTIFIER_FORMAT=1 +NOTIFIER_HEARTBEAT=(none) .fi Omitting (nullexe): from NOTIFIER_EXCLUDE_PROGRAMS may cause incomplete output related to processes no longer present. In exotic circumstances, this may be desirable anyway. @@ -127,6 +131,15 @@ Client applications may validate a file by checking the contents against the /proc/$pid/stat content, if it exists. Invalid files should be removed by clients or system scripts. +.SH MESSAGE FORMATS +Message formats tuned to SLURM, LSF, and Linux without a batch scheduler +are published, based on what the notifier detects and the users choice of +ProducerName and component_id. The version of the tuned formats is specified by number. +.PP +Format 0 omits the start time from slurm process end messages (since it is only sometimes known) and omits process duration, which depend on the start time. +.PP +Format 1 includes the start time for slurm process or the dummy value 0 when unknown) and includes process duration for all end messages. When the start time is unavailable, duration of -1.0 is published. Merging data from other sources may allow durations flagged as -1 to be computed in some later data cleanup step. + .SH NOTES .PP @@ -138,7 +151,13 @@ If not used with a sampler, the --component_id or --ProducerName options are nee to add a node identifier to the messages. Normally a process-following sampler that creates sets will add the node identifier automatically. .PP +When the daemon is started after a process is started, the process start time and therefore process +duration may not be available. In message formats which report start time, 0 indicates +data was unavailable. For processes without completely known time bounds, the duration is reported +as -1.0. +.PP Options are still in development. Several options affect only the trace output. + .SH EXAMPLES .PP Run for 30 seconds with screen and json.log test output connecting to the ldmsd from 'ldms-static-test.sh blobwriter' test: