Skip to content

Commit

Permalink
fix leaks and races in jobid parsing.
Browse files Browse the repository at this point in the history
  • Loading branch information
baallan authored and tom95858 committed Apr 18, 2024
1 parent 324e643 commit 2d13b45
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 6 deletions.
133 changes: 133 additions & 0 deletions ldms/scripts/examples/linux_proc_sampler.job
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
export plugname=linux_proc_sampler
export dsname=$(ldms_dstat_schema_name mmalloc=1 io=1 fd=1 stat=1 auto-schema=1)
export dstat_schema=$dsname
export LDMSD_LOG_LEVEL=ERROR
export LDMSD_LOG_TIME_SEC=1
export LDMSD_EXTRA="-m 128m"

function SUSLEEP () {
if test "$bypass" = "1"; then
echo skipping sleep
return 0
fi
echo -n sleep $1 ...
runuser -u $USER sleep $1
echo done
}

portbase=61060
cat << EOF > $LDMSD_RUN/exclude_env
^COLORTERM
^DBU.*
^DESKTOP_SESSION
^DISPLAY
^GDM.*
^GNO.*
^GUESTFISH.*
^XDG.*
^LS_COLORS
^SESSION_MANAGER
^SSH.*
^XAU.*
^BASH_FUNC_m
"
EOF
ldms-gen-syscall-map > $LDMSD_RUN/syscalls.map
cat << EOF > $LDMSD_RUN/metrics.input
{ "stream" : "slurm",
"argv_sep":"\t",
"syscalls": "${LDMSD_RUN}/syscalls.map",
"argv_msg": 1,
"log_send": 1,
"env_msg": 1,
"env_exclude": "${LDMSD_RUN}/exclude_env",
"fd_msg": 1,
"fd_exclude": [
"^/dev/",
"^/run/",
"^/var/",
"^/etc/",
"^/sys/",
"^/tmp/",
"^/proc/",
"^/proc$",
"^/ram/tmp/",
"^/usr/lib",
"^/usr/share/",
"^/opt/ness",
"^/ram/opt/ness",
"^/ram/var/",
"/.nfs0"
],
"published_pid_dir" : "${LDMSD_RUN}/ldms-netlink-tracked",
"metrics" : [
"status_real_user",
"status_eff_user",
"status_real_group",
"status_eff_group",
"stat_pid" ,
"stat_state",
"stat_rss",
"stat_utime",
"stat_stime",
"io_read_b",
"io_write_b",
"syscall_name"
]
}
EOF
rm -f $LOGDIR/json*.log
for pi in $(seq 80 100); do
touch ${LDMSD_RUN}/ldms-netlink-tracked/$pi
done
/bin/rm $LOGDIR/nl.log

JOBDATA $TESTDIR/job.data 1

drd="valgrind -v --tool=drd --log-file=$LOGDIR/vg.netlink.drd.txt --trace-cond=yes --trace-fork-join=yes"
memcheck="valgrind -v --leak-check=full --track-origins=yes --trace-children=yes --log-file=$LOGDIR/vg.netlink.memcheck.txt --keep-debuginfo=yes --malloc-fill=3b"
#${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked --purge-track-dir &

${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json.log --exclude-dir-path= --exclude-short-path= --exclude-programs --track-dir=${LDMSD_RUN}/ldms-netlink-tracked -x -e exec,clone,exit -L $LOGDIR/nl.log --heartbeat 1 -v 3 --ProducerName=$(hostname) --purge-track-dir --format 2 --jobid-file=$TESTDIR/job.data.1 &

# uncomment next one to test duplicate handling
#${BUILDDIR}/sbin/ldms-netlink-notifier --port=61061 --auth=none --reconnect=1 -D 30 -r -j $LOGDIR/json2.log --exclude-dir-path= --exclude-short-path= --exclude-programs &
VGARGS="--tool=drd --trace-cond=yes --trace-fork-join=yes"
VGARGS="--leak-check=full --track-origins=yes --trace-children=yes --show-leak-kinds=definite --time-stamp=yes --keep-debuginfo=yes --malloc-fill=3b"
#vgon
LDMSD -p prolog.jobid 1
vgoff
LDMSD 2
LDMSD 3
vgoff
JOBDATA $TESTDIR/job.data 1
SUSLEEP 2
MESSAGE ldms_ls on host 1:
#LDMS_LS 1 -v
MESSAGE ldms_ls on host 2:
JOBDATA $TESTDIR/job.data 1
SUSLEEP 1
LDMS_LS 2 -v
JOBDATA $TESTDIR/job.data 1
SUSLEEP 5
#MESSAGE stream_client_dump on sampler daemon 1
#for lc in $(seq 1); do
#ldmsd_controller --auth none --port 61061 --cmd stream_client_dump
# SUSLEEP 1
#done
JOBDATA $TESTDIR/job.data 1
SUSLEEP 5
for lc in $(seq 1); do
#LDMS_LS 1 -v
JOBDATA $TESTDIR/job.data 1
SUSLEEP 2
done
JOBDATA $TESTDIR/job.data 1
SUSLEEP 20
KILL_LDMSD 3 2 1
file_created $STOREDIR/node/$plugname
file_created $STOREDIR/node/$dsname
rollover_created $STOREDIR/blobs/linux_proc_sampler_argv.DAT
rollover_created $STOREDIR/blobs/linux_proc_sampler_files.DAT
rollover_created $STOREDIR/blobs/linux_proc_sampler_env.DAT
rollover_created $STOREDIR/blobs/slurm.DAT
7 changes: 7 additions & 0 deletions ldms/scripts/examples/linux_proc_sampler.job.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
load name=${plugname}
config name=${plugname} producer=localhost${i} schema=${plugname} instance=localhost${i}/${plugname} component_id=${i} perm=0644 cfg_file=${LDMSD_RUN}/metrics.input
start name=${plugname} interval=1000000 offset=0

# load name=dstat
# config name=dstat producer=localhost${i} instance=localhost${i}/${dstat_schema} component_id=${i} mmalloc=1 io=1 fd=1 auto-schema=1 stat=1) perm=777
# start name=dstat interval=1000000 offset=0
25 changes: 25 additions & 0 deletions ldms/scripts/examples/linux_proc_sampler.job.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# blobs must be allowed by writer plugin and prdcr_subscribe by daemon
load name=blob_stream_writer plugin=blob_stream_writer
config name=blob_stream_writer path=${STOREDIR} container=blobs stream=slurm stream=linux_proc_sampler_env stream=linux_proc_sampler_argv types=1 stream=linux_proc_sampler_files

load name=dstat
config name=dstat producer=localhost${i} instance=localhost${i}/${dstat_schema} component_id=${i} mmalloc=1 io=1 fd=1 auto-schema=1 stat=1) perm=777
start name=dstat interval=1000000 offset=0

prdcr_add name=localhost1 host=${HOST} type=active xprt=${XPRT} port=${port1} interval=2000000
prdcr_subscribe regex=.* stream=slurm
prdcr_subscribe regex=.* stream=linux_proc_sampler_argv
prdcr_subscribe regex=.* stream=linux_proc_sampler_env
prdcr_subscribe regex=.* stream=linux_proc_sampler_files
prdcr_start name=localhost1

updtr_add name=allhosts interval=1000000 offset=100000
updtr_prdcr_add name=allhosts regex=.*
updtr_start name=allhosts

load name=store_csv
config name=store_csv path=${STOREDIR} altheader=0

strgp_add name=store_${testname} plugin=store_csv schema=linux_proc_sampler container=node
strgp_prdcr_add name=store_${testname} regex=.*
strgp_start name=store_${testname}
13 changes: 13 additions & 0 deletions ldms/scripts/examples/linux_proc_sampler.job.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
prdcr_add name=localhost2 host=${HOST} type=active xprt=${XPRT} port=${port2} interval=2000000
prdcr_start name=localhost2

updtr_add name=allhosts interval=1000000 offset=200000
updtr_prdcr_add name=allhosts regex=.*
updtr_start name=allhosts

load name=store_csv
config name=store_csv path=${STOREDIR} altheader=0

strgp_add name=store_dstat plugin=store_csv schema=${dstat_schema} container=node
strgp_prdcr_add name=store_dstat regex=.*
strgp_start name=store_dstat
35 changes: 30 additions & 5 deletions ldms/src/sampler/netlink/netlink-notifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ static pthread_mutex_t err_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t stop_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t serial_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t host_jobid_lock = PTHREAD_MUTEX_INITIALIZER;

#define THREADED 1
#define UNTHREADED 1
Expand Down Expand Up @@ -2371,7 +2372,10 @@ static void forkstat_destroy(forkstat_t *ft)
ft->json_log = NULL;
}
free(ft->prod_field);
free(ft->host_jobid);
free(ft->compid_field);
free(ft->jobid_variable_name);
free(ft->jobid_file_name);
free(ft);
}

Expand Down Expand Up @@ -2642,8 +2646,10 @@ static int update_host_jobid(forkstat_t *ft)
return 0;
FILE *f = fopen(ft->jobid_file_name, "r");
if (!f) {
pthread_mutex_lock(&host_jobid_lock);
free(ft->host_jobid);
ft->host_jobid = NULL;
pthread_mutex_unlock(&host_jobid_lock);
return 0;
}
char *s = NULL;
Expand All @@ -2656,14 +2662,18 @@ static int update_host_jobid(forkstat_t *ft)
name = strtok_r(s, "=", &p);
if (!name || strcmp(name, ft->jobid_variable_name))
continue;
value = strtok_r(NULL, "=", &p);
value = strtok_r(NULL, " \f\v\r\t\n", &p);
if (!value) {
pthread_mutex_lock(&host_jobid_lock);
free(ft->host_jobid);
ft->host_jobid = NULL;
pthread_mutex_unlock(&host_jobid_lock);
break;
}
pthread_mutex_lock(&host_jobid_lock);
free(ft->host_jobid);
ft->host_jobid = strdup(value);
pthread_mutex_unlock(&host_jobid_lock);
if (!ft->host_jobid) {
rc = ENOMEM;
}
Expand Down Expand Up @@ -2725,6 +2735,8 @@ static int set_jobid_variable(forkstat_t *ft)
*/
static int set_jobid_file(forkstat_t *ft)
{
if (ft->jobid_file_name)
return 0;
struct exclude_arg *jfa = jobid_file_arg;
if (!jfa->parsed && !getenv(jfa->env))
return 0;
Expand Down Expand Up @@ -3305,6 +3317,7 @@ static char **load_pid_env(pid_t pid, size_t *out_len)
* get host_jobid */
#define info_jobid_str(info, ft) info->jobid ? info->jobid : ( (info->uid >= UID_MIN && ft->host_jobid) ? ft->host_jobid : "0")


static int add_env_attr(struct env_attr *a, jbuf_t *jb, const struct proc_info *info, forkstat_t *ft)
{
const char *s = info_get_var(a->env, info, ft);
Expand All @@ -3331,6 +3344,7 @@ static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_in
jbuf_t jb = jbd;
if (!jb)
return NULL;
pthread_mutex_lock(&host_jobid_lock);
jb = jbuf_append_str(jb,
"{"
"\"msgno\":%" PRIu64 ","
Expand Down Expand Up @@ -3375,6 +3389,7 @@ static jbuf_t make_process_start_data_linux(forkstat_t *ft, const struct proc_in
(int)info->pid,
(int)info->is_thread,
info->exe);
pthread_mutex_unlock(&host_jobid_lock);
return jb;

}
Expand All @@ -3387,6 +3402,7 @@ static jbuf_t make_process_end_data_linux(forkstat_t *ft, const struct proc_info
if (!jb)
return NULL;

pthread_mutex_lock(&host_jobid_lock);
if (ft->format == 0)
jb = jbuf_append_str(jb,
"{"
Expand Down Expand Up @@ -3487,6 +3503,7 @@ static jbuf_t make_process_end_data_linux(forkstat_t *ft, const struct proc_info
else
jb = NULL;

pthread_mutex_unlock(&host_jobid_lock);
return jb;
}

Expand All @@ -3496,6 +3513,7 @@ static jbuf_t make_process_start_data_lsf(forkstat_t *ft, const struct proc_info
jbuf_t jb = jbd;
if (!jb)
return NULL;
pthread_mutex_lock(&host_jobid_lock);
jb = jbuf_append_str(jb,
"{"
"\"msgno\":%" PRIu64 ","
Expand Down Expand Up @@ -3537,6 +3555,7 @@ static jbuf_t make_process_start_data_lsf(forkstat_t *ft, const struct proc_info
info->exe);

out_1:
pthread_mutex_unlock(&host_jobid_lock);
return jb;
}

Expand All @@ -3546,6 +3565,7 @@ static jbuf_t make_process_end_data_lsf(forkstat_t *ft, const struct proc_info *
jbuf_t jb = jbd;
if (!jb)
return NULL;
pthread_mutex_lock(&host_jobid_lock);
if (ft->format == 0)
jb = jbuf_append_str(jb,
"{"
Expand Down Expand Up @@ -3636,6 +3656,7 @@ static jbuf_t make_process_end_data_lsf(forkstat_t *ft, const struct proc_info *
(int64_t)info->uid);

out_1:
pthread_mutex_unlock(&host_jobid_lock);
return jb;
}

Expand All @@ -3649,6 +3670,7 @@ static jbuf_t make_process_start_data_slurm(forkstat_t *ft, const struct proc_in
jbuf_t jb = jbd;
if (!jb)
return NULL;
pthread_mutex_lock(&host_jobid_lock);
jb = jbuf_append_str(jb,
"{"
"\"msgno\":%" PRIu64 ","
Expand Down Expand Up @@ -3692,6 +3714,7 @@ static jbuf_t make_process_start_data_slurm(forkstat_t *ft, const struct proc_in
(int)info->is_thread,
info->exe);
out_1:
pthread_mutex_unlock(&host_jobid_lock);
return jb;

}
Expand All @@ -3703,6 +3726,7 @@ static jbuf_t make_process_end_data_slurm(forkstat_t *ft, const struct proc_info

if (!jb)
return NULL;
pthread_mutex_lock(&host_jobid_lock);
if (ft->format == 0)
jb = jbuf_append_str(jb,
"{"
Expand Down Expand Up @@ -3791,6 +3815,7 @@ static jbuf_t make_process_end_data_slurm(forkstat_t *ft, const struct proc_info
"}");

out_1:
pthread_mutex_unlock(&host_jobid_lock);
return jb;
}

Expand Down Expand Up @@ -4415,11 +4440,12 @@ int main(int argc, char * argv[])
forkstat_option_dump(ft, excludes);

/* netlink/ldms threaded region */
pthread_t jtid = pthread_create(&jobid_thread, NULL,
jobid_thread_check, ft); /* thread for jobid file */
/* thread for jobid file */
pthread_create(&jobid_thread, NULL, jobid_thread_check, ft);

forkstat_init_ldms_stream(ft);
int start_err = forkstat_monitor(ft, &ma); // thread to follow kernel netlink sock
// thread to follow kernel netlink sock
int start_err = forkstat_monitor(ft, &ma);
if (start_err)
goto close_abort;
if (ft->opt_trace)
Expand All @@ -4439,7 +4465,6 @@ int main(int argc, char * argv[])
pthread_join(ma.tid, &res);
if (ft->opt_trace)
PRINTF("JOIN done w/%p\n", res);
pthread_cancel(jtid); /* this will 'fail' if thread already stopped */
forkstat_finalize_ldms_stream(ft);
/* resume unthreaded region */
if (ft->opt_trace)
Expand Down
2 changes: 1 addition & 1 deletion ldms/src/sampler/netlink/netlink-notifier.man
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ The 'short' options do not override the exclude entirely options.
If not set, the ProducerName field is not included in the stream formats produced.
--format=N change the format of messages to version N.
If not set, the highest available format is used. See MESSAGE FORMATS.
--jobid_file=FILE look for job_id numbers in FILE. The default is not to look
--jobid-file=FILE look for job_id numbers in FILE. The default is not to look
for a job id file if this option is not given nor NOTIFIER_JOBID_FILE is defined.
See JOB ID FILES for details.
.fi
Expand Down

0 comments on commit 2d13b45

Please sign in to comment.