diff --git a/ldms/src/core/Makefile.am b/ldms/src/core/Makefile.am index 597634bdd..e9146136a 100644 --- a/ldms/src/core/Makefile.am +++ b/ldms/src/core/Makefile.am @@ -15,7 +15,7 @@ ldmscoreinclude_HEADERS = ldms.h ldms_core.h ldms_xprt.h ldms_auth.h kldms_req.h libldms_la_SOURCES = ldms.c ldms_xprt.c ldms_private.h ref.h \ ldms_auth.c ldms_xprt_auth.c libldms_la_LIBADD = -ldl -lpthread $(top_builddir)/lib/src/coll/libcoll.la \ - $(top_builddir)/lib/src/json/libjson_util.la \ + $(top_builddir)/lib/src/ovis_json/libovis_json.la \ $(top_builddir)/lib/src/mmalloc/libmmalloc.la \ $(top_builddir)/lib/src/zap/libzap.la diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index a1622fb22..b80307942 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -45,6 +45,7 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#define _GNU_SOURCE #include #include #include @@ -75,6 +76,16 @@ static char *__set_dir = SET_DIR_PATH; #define SET_DIR_LEN sizeof(SET_DIR_PATH) static char __set_path[PATH_MAX]; +static void __destroy_set(void *v); + +const char *ldms_xprt_op_names[] = { + "LOOKUP", + "UPDATE", + "PUBLISH", + "SET_DELETE", + "DIR", + "SEND" +}; /* This function is useful for displaying data structures stored in * mmap'd memory that on some platforms is not accessible to the @@ -99,7 +110,7 @@ static int set_comparator(void *a, const void *b) return strcmp(x, y); } -static struct rbt set_tree = { +static struct rbt __set_tree = { .root = NULL, .comparator = set_comparator }; @@ -116,13 +127,20 @@ static int id_comparator(void *a, const void *b) return 0; } -static struct rbt id_tree = { +static struct rbt __id_tree = { .root = NULL, .comparator = id_comparator }; static pthread_mutex_t __set_tree_lock = PTHREAD_MUTEX_INITIALIZER; +static struct rbt del_tree = { + .root = NULL, + .comparator = id_comparator +}; + +static pthread_mutex_t __del_tree_lock = PTHREAD_MUTEX_INITIALIZER; + void __ldms_gn_inc(struct ldms_set *set, ldms_mdesc_t desc) { if (desc->vd_flags & LDMS_MDESC_F_DATA) { @@ -139,9 +157,11 @@ struct ldms_set *__ldms_find_local_set(const char *set_name) struct rbn *z; struct ldms_set *s = NULL; - z = rbt_find(&set_tree, (void *)set_name); - if (z) + z = rbt_find(&__set_tree, (void *)set_name); + if (z) { s = container_of(z, struct ldms_set, rb_node); + ref_get(&s->ref, __func__); + } return s; } @@ -151,7 +171,7 @@ struct ldms_set *__ldms_local_set_first() struct rbn *z; struct ldms_set *s = NULL; - z = rbt_min(&set_tree); + z = rbt_min(&__set_tree); if (z) s = container_of(z, struct ldms_set, rb_node); return s; @@ -176,26 +196,21 @@ void __ldms_set_tree_unlock() pthread_mutex_unlock(&__set_tree_lock); } -static ldms_set_t __set_by_name(const char *set_name) -{ - struct ldms_set *set = __ldms_find_local_set(set_name); - struct ldms_rbuf_desc *rbd = NULL; - if (!set) - goto out; - - rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL); - - out: - return rbd; -} - extern ldms_set_t ldms_set_by_name(const char *set_name) { - ldms_set_t s; + struct ldms_set *set; + struct ldms_rbuf_desc *rbd; + __ldms_set_tree_lock(); - s = __set_by_name(set_name); + set = __ldms_find_local_set(set_name); __ldms_set_tree_unlock(); - return s; + if (!set) + return NULL; + rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL, __func__); + if (rbd) + ref_get(&set->ref, __func__); + ref_put(&set->ref, "__ldms_find_local_set"); + return rbd; } uint64_t ldms_set_meta_gn_get(ldms_set_t s) @@ -227,7 +242,7 @@ int __ldms_for_all_sets(int (*cb)(struct ldms_set *, void *), void *arg) { struct cb_arg user_arg = { arg, cb }; int rc; - rc = rbt_traverse(&set_tree, rbn_cb, &user_arg); + rc = rbt_traverse(&__set_tree, rbn_cb, &user_arg); return rc; } @@ -437,16 +452,10 @@ __record_set(const char *instance_name, { struct ldms_set *set; - set = __ldms_find_local_set(instance_name); - if (set) { - errno = EEXIST; - return NULL; - } - set = calloc(1, sizeof *set); if (!set) { errno = ENOMEM; - goto out; + goto out_0; } LIST_INIT(&set->local_info); @@ -461,11 +470,25 @@ __record_set(const char *instance_name, set->data = __set_array_get(set, set->curr_idx); set->flags = flags; + ref_init(&set->ref, __func__, __destroy_set, set); rbn_init(&set->rb_node, get_instance_name(set->meta)->name); - rbt_ins(&set_tree, &set->rb_node); rbn_init(&set->id_node, (void *)set->set_id); - rbt_ins(&id_tree, &set->id_node); - out: + + __ldms_set_tree_lock(); + /* Check if we lost a race creating this same set name */ + struct ldms_set *nset = __ldms_find_local_set(instance_name); + if (nset) { + ref_put(&nset->ref, "__ldms_find_local_set"); + errno = EEXIST; + free(set); + set = NULL; + goto out_1; + } + rbt_ins(&__set_tree, &set->rb_node); + rbt_ins(&__id_tree, &set->id_node); + out_1: + __ldms_set_tree_unlock(); + out_0: return set; } @@ -476,13 +499,12 @@ extern struct ldms_set *__ldms_set_by_id(uint64_t id) { struct ldms_set *set = NULL; struct rbn *rbn; - rbn = rbt_find(&id_tree, (void *)id); + rbn = rbt_find(&__id_tree, (void *)id); if (rbn) set = container_of(rbn, struct ldms_set, id_node); return set; } -/* Caller must hold the set tree lock. */ static int __ldms_set_publish(struct ldms_set *set) { @@ -504,9 +526,9 @@ uint64_t ldms_set_id(ldms_set_t set) int ldms_set_publish(ldms_set_t sd) { int rc; - __ldms_set_tree_lock(); + ref_get(&sd->set->ref, "publish"); rc = __ldms_set_publish(sd->set); - __ldms_set_tree_unlock(); + ref_put(&sd->set->ref, "publish"); return rc; } @@ -525,9 +547,9 @@ int __ldms_set_unpublish(struct ldms_set *set) int ldms_set_unpublish(ldms_set_t sd) { int rc; - __ldms_set_tree_lock(); + ref_get(&sd->set->ref, "unpublish"); rc = __ldms_set_unpublish(sd->set); - __ldms_set_tree_unlock(); + ref_put(&sd->set->ref, "unpublish"); return rc; } @@ -589,6 +611,11 @@ void *_open_and_map_file(const char *path, int type, int create, size_t *size) return p; } +void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +{ + *stats = _x->stats; +} + static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg) { ldms_set_t *ps = arg; @@ -600,20 +627,37 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg) int ldms_xprt_update(ldms_set_t s, ldms_update_cb_t cb, void *arg) { - if (s->set->flags & LDMS_SET_F_REMOTE) { - ldms_t x = s->xprt; - if (!cb) { - int rc = __ldms_remote_update(x, s, sync_update_cb, arg); - if (rc) - return rc; - sem_wait(&x->sem); - return x->sem_rc; + struct ldms_set *set = s->set; + ldms_t xprt = ldms_xprt_get(s->xprt); + int rc; + + assert(set); + + if (0 == (set->flags & LDMS_SET_F_REMOTE)) { + if (cb) + cb(s->xprt, s, 0, arg); + return 0; + } + + if (!xprt) + return EINVAL; + + pthread_mutex_lock(&xprt->lock); + if (!cb) { + int rc = __ldms_remote_update(xprt, s, sync_update_cb, arg); + pthread_mutex_unlock(&xprt->lock); + if (rc) { + ldms_xprt_put(xprt); + return rc; } - return __ldms_remote_update(x, s, cb, arg); + sem_wait(&xprt->sem); + rc = xprt->sem_rc; + ldms_xprt_put(xprt); } - if (cb) - cb(s->xprt, s, 0, arg); - return 0; + rc = __ldms_remote_update(xprt, s, cb, arg); + pthread_mutex_unlock(&xprt->lock); + ldms_xprt_put(xprt); + return rc; } void __ldms_set_info_delete(struct ldms_set_info_list *info) @@ -628,64 +672,96 @@ void __ldms_set_info_delete(struct ldms_set_info_list *info) } } +static void __destroy_set_no_lock(void *v) +{ + struct ldms_set *set = v; + rbt_del(&del_tree, &set->del_node); + rbt_verify(&del_tree); + mm_free(set->meta); + __ldms_set_info_delete(&set->local_info); + __ldms_set_info_delete(&set->remote_info); + free(set); +} + +static void __destroy_set(void *v) +{ + pthread_mutex_lock(&__del_tree_lock); + __destroy_set_no_lock(v); + pthread_mutex_unlock(&__del_tree_lock); +} + +static void __set_delete_cb(ldms_t xprt, int status, ldms_set_t rbd, void *cb_arg) +{ + struct ldms_set *set = cb_arg; + ref_put(&set->ref, "share_lookup"); + ref_put(&rbd->ref, "share_lookup"); +} + void ldms_set_delete(ldms_set_t s) { + extern void __ldms_rbd_xprt_release(struct ldms_rbuf_desc *rbd); struct ldms_rbuf_desc *rbd; - struct ldms_set *set; - struct ldms_xprt *xprt; - - if (!s) - assert(NULL == "The metric set passed in is NULL"); + struct ldms_set *set = s->set; + ldms_t xprt; __ldms_set_tree_lock(); - set = s->set; - rbt_del(&set_tree, &set->rb_node); - rbt_del(&id_tree, &set->id_node); + rbt_del(&__set_tree, &set->rb_node); + rbt_del(&__id_tree, &set->id_node); __ldms_set_tree_unlock(); + + ldms_xprt_set_delete(s, __set_delete_cb, set); + + pthread_mutex_lock(&set->lock); while (!LIST_EMPTY(&set->remote_rbd_list)) { rbd = LIST_FIRST(&set->remote_rbd_list); - xprt = rbd->xprt; - if (xprt) { - pthread_mutex_lock(&xprt->lock); - __ldms_rbd_xprt_release(rbd); - pthread_mutex_unlock(&xprt->lock); + LIST_REMOVE(rbd, set_link); + ref_put(&rbd->ref, "set_rbd_list"); + switch (rbd->type) { + case LDMS_RBD_INITIATOR: + ref_put(&rbd->ref, "rendezvous_lookup"); + ref_put(&set->ref, "rendezvous_lookup"); + break; + case LDMS_RBD_TARGET: + ref_put(&rbd->ref, "rendezvous_push"); + ref_put(&set->ref, "rendezvous_push"); + case LDMS_RBD_LOCAL: + /* cleaned up in __set_delete_cb() */ + break; } - __ldms_free_rbd(rbd); - } - while (!LIST_EMPTY(&set->local_rbd_list)) { - rbd = LIST_FIRST(&set->local_rbd_list); xprt = rbd->xprt; if (xprt) { pthread_mutex_lock(&xprt->lock); - __ldms_rbd_xprt_release(rbd); + if (rbd->xprt) + /* Make certain we didn't lose a disconnect race */ + __ldms_rbd_xprt_release(rbd); pthread_mutex_unlock(&xprt->lock); } - __ldms_free_rbd(rbd); } + pthread_mutex_unlock(&set->lock); - mm_free(set->meta); - __ldms_set_info_delete(&set->local_info); - __ldms_set_info_delete(&set->remote_info); - free(set); + /* Add the set to the delete tree with the current timestamp */ + set->del_time = time(NULL); + rbn_init(&set->del_node, &set->del_time); + pthread_mutex_lock(&__del_tree_lock); + rbt_ins(&del_tree, &set->del_node); + pthread_mutex_unlock(&__del_tree_lock); + + /* Drop the create references on the RBD and the set */ + ref_put(&s->ref, "set_new"); + ref_put(&set->ref, "set_new"); + ref_put(&set->ref, "__record_set"); } void ldms_set_put(ldms_set_t s) { struct ldms_set *set; - struct ldms_xprt *xprt; if (!s) return; - __ldms_set_tree_lock(); set = s->set; pthread_mutex_lock(&set->lock); - xprt = s->xprt; - if (xprt) - pthread_mutex_lock(&xprt->lock); - __ldms_free_rbd(s); /* removes the RBD from the local/remote rbd list */ - if (xprt) - pthread_mutex_unlock(&xprt->lock); + __ldms_free_rbd(s, "ldms_set_by_name"); /* removes the RBD from the local/remote rbd list */ pthread_mutex_unlock(&set->lock); - __ldms_set_tree_unlock(); + ref_put(&set->ref, "ldms_set_by_name"); } static void sync_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, @@ -808,7 +884,6 @@ struct ldms_set *__ldms_create_set(const char *instance_name, return NULL; } - memset(meta, 0, meta_len + array_card * data_len); LDMS_VERSION_SET(meta->version); meta->meta_sz = __cpu_to_le32(meta_len); @@ -1072,21 +1147,19 @@ ldms_set_t ldms_set_new_with_auth(const char *instance_name, value_off += __ldms_value_size_get(vd->vd_type, __le32_to_cpu(vd->vd_array_count)); } - __ldms_set_tree_lock(); struct ldms_set *set = __record_set(instance_name, meta, data_base, LDMS_SET_F_LOCAL); if (!set) goto err_1; - ldms_set_t rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL); + ldms_set_t rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL, "set_new"); if (!rbd) goto err_2; - __ldms_set_tree_unlock(); + return rbd; err_2: - rbt_del(&set_tree, &set->rb_node); - rbt_del(&id_tree, &set->id_node); + rbt_del(&__set_tree, &set->rb_node); + rbt_del(&__id_tree, &set->id_node); free(set); err_1: - __ldms_set_tree_unlock(); mm_free(meta); return NULL; } @@ -1153,14 +1226,13 @@ int ldms_mmap_set(void *meta_addr, void *data_addr, ldms_set_t *ps) int flags; flags = LDMS_SET_F_MEMMAP | LDMS_SET_F_LOCAL; - __ldms_set_tree_lock(); struct ldms_set *set = __record_set(get_instance_name(sh)->name, sh, dh, flags); if (!set) goto err; int rc = __ldms_set_publish(set); if (!rc) { struct ldms_rbuf_desc *rbd; - rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL); + rbd = __ldms_alloc_rbd(NULL, set, LDMS_RBD_LOCAL, "set_new"); if (!rbd) goto err; *ps = rbd; @@ -1168,10 +1240,8 @@ int ldms_mmap_set(void *meta_addr, void *data_addr, ldms_set_t *ps) errno = rc; goto err; } - __ldms_set_tree_unlock(); return 0; err: - __ldms_set_tree_unlock(); return errno; } @@ -2543,3 +2613,64 @@ int ldms_mval_parse_scalar(ldms_mval_t v, enum ldms_value_type vt, const char *s return EINVAL; return 0; } + +#define DELETE_TIMEOUT (600) /* 10 minutes */ +#define DELETE_CHECK (60) +#define REPORT_MIN (10) + +static void *delete_proc(void *arg) +{ + struct rbn *rbn; + struct ldms_set *set; + ldms_name_t name; + time_t dur; + char *to = getenv("LDMS_DELETE_TIMEOUT"); + int timeout = (to ? atoi(to) : DELETE_TIMEOUT); + if (timeout <= DELETE_CHECK) + timeout = DELETE_CHECK; + do { + /* + * Iterate through the tree from oldest to + * newest. Delete any set older than the threshold + */ + pthread_mutex_lock(&__del_tree_lock); + rbn = rbt_max(&del_tree); + while (rbn) { + set = container_of(rbn, struct ldms_set, del_node); + name = get_instance_name(set->meta); + dur = time(NULL) - set->del_time; + fprintf(stderr, + "Dangling set %s with reference count %d, " + "waiting %jd seconds\n", + name->name, set->ref.ref_count, dur); + fflush(stderr); + if (dur < timeout) + break; + fprintf(stderr, + "Deleting dangling set %s with reference " + "count %d, waited %jd seconds\n", + name->name, set->ref.ref_count, dur); + ref_dump(&set->ref, __func__, stderr); + __destroy_set_no_lock(set); + rbn = rbt_max(&del_tree); + fflush(stderr); + } + pthread_mutex_unlock(&__del_tree_lock); + sleep(DELETE_CHECK); + } while (1); + return NULL; +} + +static pthread_t delete_thread; +static void __attribute__ ((constructor)) cs_init(void) +{ + int rc = pthread_create(&delete_thread, NULL, delete_proc, NULL); + if (!rc) { + pthread_setname_np(delete_thread, "delete_thread"); + } +} + +static void __attribute__ ((destructor)) cs_term(void) +{ +} + diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index 60b06ec82..ea8399374 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -250,7 +250,7 @@ ldms_t ldms_xprt_by_remote_sin(struct sockaddr_in *sin); * The ldms_xprt_first() and ldms_xprt_next() functions are used to iterate * among all transport endpoints in the system. The ldms_xprt_first() function * returns the first endpoint and takes a reference on the handle. This reference - * should be released by calling ldms_release_xprt() when the caller has finished + * should be released by calling ldms_xprt_put() when the caller has finished * with the endpoint. * * \returns The first transport endpoint or NULL if there are no open transports. @@ -263,7 +263,7 @@ ldms_t ldms_xprt_first(); * The ldms_xprt_first() and ldms_xprt_next() functions are used to iterate * among all transport endpoints in the system. The ldms_xprt_first() function * returns the first endpoint and takes a reference on the handle. This reference - * should be released by calling ldms_release_xprt() when the caller has finished + * should be released by calling ldms_xprt_put() when the caller has finished * with the endpoint. * * \returns The first transport endpoint or NULL if there are no open transports. @@ -341,7 +341,7 @@ extern ldms_t ldms_xprt_new(const char *name, ldms_log_fn_t log_fn); * This is like ::ldms_xprt_new(), but with authentication plugin attached to * the transport. * - * \param xprt_name The name of the transport type. + * \param xprt_name The transport type name string. * \param log_fn An optional function to call when logging transport messages. * \param auth_name The name of the authentication plugin. * \param auth_av_list The attribute-value list containing options for the @@ -352,6 +352,13 @@ ldms_t ldms_xprt_new_with_auth(const char *xprt_name, ldms_log_fn_t log_fn, const char *auth_name, struct attr_value_list *auth_av_list); +/** + * \brief Return the transport type name string + * \param x The transport handle + * \returns The transport type name string + */ +const char *ldms_xprt_type_name(ldms_t x); + /** * \brief Set the ldms transport priority * @@ -378,9 +385,15 @@ enum ldms_xprt_event_type { LDMS_XPRT_EVENT_DISCONNECTED, /*! Receive data from a remote host */ LDMS_XPRT_EVENT_RECV, + /*! Lookup set has been deleted at peer */ + LDMS_XPRT_EVENT_SET_DELETE, LDMS_XPRT_EVENT_LAST }; +struct ldms_xprt_set_delete_data { + ldms_set_t set; /*! The local set looked up at peer */ +}; + typedef struct ldms_xprt_event { /*! ldms event type */ enum ldms_xprt_event_type type; @@ -388,10 +401,13 @@ typedef struct ldms_xprt_event { * may be freed when the callback returns. * \c data is NULL if the type is not LDMS_CONN_EVENT_RECV. */ - char *data; - /*! The length of \c data in bytes. - * \c data_len is 0 if \c type is not LDMS_CONN_EVENT_RECV. - */ + union { + /*! The length of \c data in bytes. + * \c data_len is 0 if \c type is not LDMS_CONN_EVENT_RECV. + */ + char *data; + struct ldms_xprt_set_delete_data set_delete; + }; size_t data_len; } *ldms_xprt_event_t; @@ -865,6 +881,51 @@ extern int ldms_xprt_cancel_push(ldms_set_t s); */ extern int ldms_xprt_push(ldms_set_t s); +typedef struct ldms_stats_entry { + uint64_t count; + uint64_t total_us; + uint64_t min_us; + uint64_t max_us; + uint64_t mean_us; +} *ldms_stats_entry_t; + +typedef enum ldms_xprt_ops_e { + LDMS_XPRT_OP_LOOKUP, + LDMS_XPRT_OP_UPDATE, + LDMS_XPRT_OP_PUBLISH, + LDMS_XPRT_OP_SET_DELETE, + LDMS_XPRT_OP_DIR, + LDMS_XPRT_OP_SEND, + LDMS_XPRT_OP_COUNT +} ldms_xprt_ops_t; + +extern const char *ldms_xprt_op_names[]; + +struct ldms_xprt_rate_data { + double connect_rate_s; + double connect_request_rate_s; + double disconnect_rate_s; + double reject_rate_s; + double auth_fail_rate_s; +}; + +/** + * Retrieve transport rate data + */ +void ldms_xprt_rate_data(struct ldms_xprt_rate_data *data); + +typedef struct ldms_xprt_stats { + struct ldms_stats_entry ops[LDMS_XPRT_OP_COUNT]; +} *ldms_xprt_stats_t; + +/** + * \brief Retrieve transport request statistics + * + * \param x The transport handle + * \param s Pointer to an ldms_xprt_stats structure + */ +extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); + /** * \brief Create a metric set schema * @@ -1116,6 +1177,15 @@ extern uint32_t ldms_set_data_sz_get(ldms_set_t s); */ extern ldms_set_t ldms_set_by_name(const char *set_name); +/** + * \brief Get a set by name on a particular transport + * + * \param s The transport set handle + * \param set_name The set name + * \returns The ldms_set_t handle or 0 if not found + */ +extern ldms_set_t ldms_xprt_set_by_name(ldms_t x, const char *set_name); + /** * \brief Get the metric schema generation number. * @@ -1807,6 +1877,30 @@ int ldms_access_check(ldms_t x, uint32_t acc, uid_t obj_uid, gid_t obj_gid, * \} */ +/** + * \brief Return the time difference in microseconds + * + * Computes the number of microseconds in the interval end - start. + * Note that the result may be negative. + * + * \param start Pointer to struct timespec + * \param end Pointer to struct timespec + * \returns The number of microseconds in the interval end - start + */ +static inline int64_t ldms_timespec_diff_us(struct timespec *start, struct timespec *end) +{ + int64_t secs_ns; + int64_t nsecs; + secs_ns = (end->tv_sec - start->tv_sec) * 1000000000; + nsecs = end->tv_nsec - start->tv_nsec; + return (secs_ns + nsecs) / 1000; +} + +static inline double ldms_timespec_diff_s(struct timespec *start, struct timespec *end) +{ + return (double)ldms_timespec_diff_us(start, end) / (double)1000000.0; +} + #ifdef __cplusplus } #endif diff --git a/ldms/src/core/ldms_private.h b/ldms/src/core/ldms_private.h index 7cdb7b009..802f36640 100644 --- a/ldms/src/core/ldms_private.h +++ b/ldms/src/core/ldms_private.h @@ -50,6 +50,7 @@ #define _LDMS_PRIVATE_H #include #include +#include #include #include "ovis_util/os_util.h" @@ -85,14 +86,17 @@ struct ldms_set_info_pair { LIST_HEAD(ldms_set_info_list, ldms_set_info_pair); LIST_HEAD(rbd_list, ldms_rbuf_desc); struct ldms_set { + struct ref_s ref; unsigned long flags; uint64_t set_id; /* unique identifier for a set in this daemon */ + uint64_t del_time; /* Unix timestamp when set was deleted */ struct ldms_set_hdr *meta; struct ldms_data_hdr *data; /* points to current entry of data array */ struct ldms_set_info_list local_info; struct ldms_set_info_list remote_info; /*set info from the lookup operation */ - struct rbn rb_node; - struct rbn id_node; + struct rbn rb_node; /* Indexed by instance name */ + struct rbn id_node; /* Indexed by set_id */ + struct rbn del_node; /* Indexed by timestamp */ struct rbd_list local_rbd_list; struct rbd_list remote_rbd_list; pthread_mutex_t lock; @@ -104,9 +108,13 @@ struct ldms_set { #define roundup(_v,_s) ((_v + (_s - 1)) & ~(_s - 1)) extern int __ldms_xprt_push(ldms_set_t s, int push_flags); -extern struct ldms_rbuf_desc *__ldms_alloc_rbd(struct ldms_xprt *, - struct ldms_set *s, enum ldms_rbd_type type); -extern void __ldms_free_rbd(struct ldms_rbuf_desc *rbd); +extern struct ldms_rbuf_desc *___ldms_alloc_rbd(struct ldms_xprt *, + struct ldms_set *s, enum ldms_rbd_type type, + const char *name, const char *func, int line); +#define __ldms_alloc_rbd(_x_, _s_, _t_, _n_) ___ldms_alloc_rbd(_x_, _s_, _t_, _n_, __func__, __LINE__) +extern void ___ldms_free_rbd(struct ldms_rbuf_desc *rbd, const char *name, const char *func, int line); +#define __ldms_free_rbd(_r_, _n_) ___ldms_free_rbd(_r_, _n_, __func__, __LINE__) +extern void __ldms_free_rbd_no_lock(struct ldms_rbuf_desc *rbd, const char *name, const char *func, int line); extern void __ldms_rbd_xprt_release(struct ldms_rbuf_desc *rbd); extern int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, @@ -121,6 +129,7 @@ __ldms_create_set(const char *instance_name, const char *schema_name, extern void __ldms_dir_add_set(struct ldms_set *set); extern void __ldms_dir_del_set(struct ldms_set *set); extern void __ldms_dir_upd_set(struct ldms_set *set); +extern int __ldms_delete_remote_set(ldms_t _x, ldms_set_t s); struct ldms_name_entry { LIST_ENTRY(ldms_name_entry) entry; diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index f770c19e6..0bad817ec 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -50,6 +50,7 @@ #include #include #include + #include #include #include @@ -64,6 +65,7 @@ #include #include #include +#include #include #include #include "ovis_util/os_util.h" @@ -71,10 +73,32 @@ #include "ldms_xprt.h" #include "ldms_private.h" +/* Global Transport Statistics */ +static uint64_t xprt_connect_count; +static uint64_t xprt_connect_request_count; +static uint64_t xprt_disconnect_count; +static uint64_t xprt_reject_count; +static uint64_t xprt_auth_fail_count; +static struct timespec xprt_start; + +void ldms_xprt_rate_data(struct ldms_xprt_rate_data *data) +{ + struct timespec now; + double dur_s; + (void)clock_gettime(CLOCK_REALTIME, &now); + dur_s = ldms_timespec_diff_s(&xprt_start, &now); + data->connect_rate_s = (double)xprt_connect_count / dur_s; + data->connect_request_rate_s = (double)xprt_connect_request_count / dur_s; + data->disconnect_rate_s = (double)xprt_disconnect_count / dur_s; + data->reject_rate_s = (double)xprt_reject_count / dur_s; + data->auth_fail_rate_s = (double)xprt_auth_fail_count / dur_s; +} + #define LDMS_XPRT_AUTH_GUARD(x) (((x)->auth_flag != LDMS_XPRT_AUTH_DISABLE) && \ ((x)->auth_flag != LDMS_XPRT_AUTH_APPROVED)) static struct ldms_rbuf_desc *ldms_lookup_rbd(struct ldms_xprt *, struct ldms_set *); +static struct ldms_rbuf_desc *__rbd_by_set_id(struct ldms_xprt *x, uint64_t id); /** * zap callback function. @@ -112,8 +136,10 @@ static zap_t ldms_zap_list[3] = {0}; ldms_t ldms_xprt_get(ldms_t x) { - assert(x->ref_count > 0); - __sync_add_and_fetch(&x->ref_count, 1); + if (x) { + assert(x->ref_count > 0); + __sync_add_and_fetch(&x->ref_count, 1); + } return x; } @@ -125,7 +151,8 @@ ldms_t ldms_xprt_first() x = LIST_FIRST(&xprt_list); if (!x) goto out; - x = ldms_xprt_get(x); + ldms_xprt_get(x); /* next reference */ + x = ldms_xprt_get(x); /* caller reference */ out: pthread_mutex_unlock(&xprt_list_lock); return x; @@ -133,13 +160,16 @@ ldms_t ldms_xprt_first() ldms_t ldms_xprt_next(ldms_t x) { + ldms_t prev_x = x; pthread_mutex_lock(&xprt_list_lock); x = LIST_NEXT(x, xprt_link); if (!x) goto out; - x = ldms_xprt_get(x); + ldms_xprt_get(x); /* next reference */ + x = ldms_xprt_get(x); /* caller reference */ out: pthread_mutex_unlock(&xprt_list_lock); + ldms_xprt_put(prev_x); /* next reference */ return x; } @@ -151,11 +181,11 @@ ldms_t ldms_xprt_by_remote_sin(struct sockaddr_in *sin) ldms_t l, next_l; l = ldms_xprt_first(); while (l) { - int rc = zap_get_name(l->zap_ep, - (struct sockaddr *)&ss_local, - (struct sockaddr *)&ss_remote, - &socklen); - if (rc) + zap_err_t zerr = zap_get_name(l->zap_ep, + (struct sockaddr *)&ss_local, + (struct sockaddr *)&ss_remote, + &socklen); + if (zerr) goto next; struct sockaddr_in *s = (struct sockaddr_in *)&ss_remote; if (s->sin_addr.s_addr == sin->sin_addr.s_addr @@ -172,38 +202,132 @@ ldms_t ldms_xprt_by_remote_sin(struct sockaddr_in *sin) /* Caller must call with the ldms xprt lock held */ struct ldms_context *__ldms_alloc_ctxt(struct ldms_xprt *x, size_t sz, - ldms_context_type_t type) + ldms_context_type_t type, ...) { + va_list ap; struct ldms_context *ctxt; + va_start(ap, type); ctxt = calloc(1, sz); if (!ctxt) { x->log("%s(): Out of memory\n", __func__); return ctxt; } + ctxt->x = ldms_xprt_get(x); + (void)clock_gettime(CLOCK_REALTIME, &ctxt->start); #ifdef CTXT_DEBUG x->log("%s(): x %p: alloc ctxt %p: type %d\n", __func__, x, ctxt, type); #endif /* CTXT_DEBUG */ ctxt->type = type; TAILQ_INSERT_TAIL(&x->ctxt_list, ctxt, link); + switch (type) { + case LDMS_CONTEXT_LOOKUP_REQ: + ctxt->lu_req.cb = va_arg(ap, ldms_lookup_cb_t); + ctxt->lu_req.cb_arg = va_arg(ap, void *); + ctxt->lu_req.path = va_arg(ap, char *); + ctxt->lu_req.flags = va_arg(ap, enum ldms_lookup_flags); + break; + case LDMS_CONTEXT_LOOKUP_READ: + ctxt->lu_read.s = va_arg(ap, ldms_set_t); + ref_get(&ctxt->lu_read.s->ref, __func__); + ctxt->lu_read.cb = va_arg(ap, ldms_lookup_cb_t); + ctxt->lu_read.cb_arg = va_arg(ap, void *); + ctxt->lu_read.more = va_arg(ap, int); + ctxt->lu_read.flags = va_arg(ap, enum ldms_lookup_flags); + break; + case LDMS_CONTEXT_UPDATE: + case LDMS_CONTEXT_UPDATE_META: + ctxt->update.s = va_arg(ap, ldms_set_t); + ref_get(&ctxt->update.s->ref, __func__); + ctxt->update.cb = va_arg(ap, ldms_update_cb_t); + ctxt->update.cb_arg = va_arg(ap, void *); + ctxt->update.idx_from = va_arg(ap, int); + ctxt->update.idx_to = va_arg(ap, int); + break; + case LDMS_CONTEXT_REQ_NOTIFY: + ctxt->req_notify.s = va_arg(ap, ldms_set_t); + ref_get(&ctxt->req_notify.s->ref, __func__); + ctxt->req_notify.cb = va_arg(ap, ldms_notify_cb_t); + ctxt->req_notify.cb_arg = va_arg(ap, void *); + break; + case LDMS_CONTEXT_DIR: + ctxt->dir.cb = va_arg(ap, ldms_dir_cb_t); + ctxt->dir.cb_arg = va_arg(ap, void *); + break; + case LDMS_CONTEXT_SET_DELETE: + ctxt->set_delete.s = va_arg(ap, ldms_set_t); + ref_get(&ctxt->set_delete.s->ref, __func__); + ctxt->set_delete.cb = va_arg(ap, ldms_set_delete_cb_t); + ctxt->set_delete.cb_arg = va_arg(ap, void *); + break; + case LDMS_CONTEXT_PUSH: + case LDMS_CONTEXT_DIR_CANCEL: + case LDMS_CONTEXT_SEND: + break; + } + va_end(ap); return ctxt; } /* Caller must call with the ldms xprt lock held */ void __ldms_free_ctxt(struct ldms_xprt *x, struct ldms_context *ctxt) { + int64_t dur_us; + struct timespec end; + ldms_stats_entry_t e = NULL; + + (void)clock_gettime(CLOCK_REALTIME, &end); + dur_us = ldms_timespec_diff_us(&ctxt->start, &end); + TAILQ_REMOVE(&x->ctxt_list, ctxt, link); - if (ctxt->type == LDMS_CONTEXT_LOOKUP) { - if (ctxt->lookup.path) { - free(ctxt->lookup.path); - ctxt->lookup.path = NULL; - } + switch (ctxt->type) { + case LDMS_CONTEXT_LOOKUP_REQ: + free(ctxt->lu_req.path); + break; + case LDMS_CONTEXT_LOOKUP_READ: + e = &x->stats.ops[LDMS_XPRT_OP_LOOKUP]; + ref_put(&ctxt->lu_read.s->ref, "__ldms_alloc_ctxt"); + break; + case LDMS_CONTEXT_UPDATE: + case LDMS_CONTEXT_UPDATE_META: + e = &x->stats.ops[LDMS_XPRT_OP_UPDATE]; + ref_put(&ctxt->update.s->ref, "__ldms_alloc_ctxt"); + break; + case LDMS_CONTEXT_REQ_NOTIFY: + ref_put(&ctxt->req_notify.s->ref, "__ldms_alloc_ctxt"); + break; + case LDMS_CONTEXT_SET_DELETE: + e = &x->stats.ops[LDMS_XPRT_OP_SET_DELETE]; + ref_put(&ctxt->set_delete.s->ref, "__ldms_alloc_ctxt"); + break; + case LDMS_CONTEXT_DIR: + e = &x->stats.ops[LDMS_XPRT_OP_DIR]; + break; + case LDMS_CONTEXT_SEND: + e = &x->stats.ops[LDMS_XPRT_OP_SEND]; + break; + case LDMS_CONTEXT_PUSH: + case LDMS_CONTEXT_DIR_CANCEL: + break; + } + + if (e) { + if (e->min_us > dur_us) + e->min_us = dur_us; + if (e->max_us < dur_us) + e->max_us = dur_us; + e->total_us += dur_us; + e->mean_us = (e->count * e->mean_us) + dur_us; + e->count += 1; + e->mean_us /= e->count; } + ldms_xprt_put(ctxt->x); free(ctxt); + } static void send_dir_update(struct ldms_xprt *x, enum ldms_dir_type t, - struct ldms_set *set) + char *json, size_t json_sz) { size_t hdr_len; size_t buf_len; @@ -218,23 +342,14 @@ static void send_dir_update(struct ldms_xprt *x, reply = malloc(buf_len); cnt = snprintf(reply->dir.json_data, buf_len - hdr_len, - "{ \"directory\" : ["); - cnt += __ldms_format_set_meta_as_json(set, 0, - &reply->dir.json_data[cnt], - buf_len - hdr_len - cnt); - if (cnt >= (buf_len - hdr_len)) - goto out; - cnt += snprintf(&reply->dir.json_data[cnt], - buf_len - hdr_len - cnt, - "]}"); - if (cnt >= (buf_len - hdr_len)) + "{ \"directory\" : [ %s ]}", json); + if (cnt >= buf_len) goto out; - reply->hdr.xid = x->remote_dir_xid; reply->hdr.cmd = htonl(LDMS_CMD_DIR_UPDATE_REPLY); reply->hdr.rc = 0; reply->dir.type = htonl(t); - reply->dir.json_data_len = cnt; + reply->dir.json_data_len = htonl(cnt); reply->hdr.len = htonl(hdr_len + cnt); #ifdef DEBUG @@ -245,6 +360,7 @@ static void send_dir_update(struct ldms_xprt *x, zap_err_t zerr; zerr = zap_send(x->zap_ep, reply, hdr_len + cnt); if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; if (x->log) x->log("%s: x %p: zap_send synchronously error. '%s'\n", __FUNCTION__, x, zap_err_str(zerr)); @@ -286,6 +402,7 @@ static void send_req_notify_reply(struct ldms_xprt *x, zap_err_t zerr = zap_send(x->zap_ep, reply, len); if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: zap_send synchronously error. '%s'\n", __FUNCTION__, zap_err_str(zerr)); ldms_xprt_close(x); @@ -294,17 +411,41 @@ static void send_req_notify_reply(struct ldms_xprt *x, return; } +char *__ldms_format_set_for_dir(struct ldms_set *set, size_t *buf_sz) +{ + size_t json_buf_sz = 4096; + char *json_buf; + size_t cnt; + + json_buf = malloc(json_buf_sz); + cnt = __ldms_format_set_meta_as_json(set, 0, json_buf, json_buf_sz); + while (cnt >= json_buf_sz) { + free(json_buf); + json_buf_sz += 4096; + json_buf = malloc(json_buf_sz); + if (!json_buf) + return NULL; + cnt = __ldms_format_set_meta_as_json(set, 0, json_buf, json_buf_sz); + } + *buf_sz = cnt; + return json_buf; +} + static void dir_update(struct ldms_set *set, enum ldms_dir_type t) { - struct ldms_xprt *x, *next_x; - x = (struct ldms_xprt *)ldms_xprt_first(); - while (x) { + char *json_buf; + size_t json_cnt; + struct ldms_xprt *x; + json_buf = __ldms_format_set_for_dir(set, &json_cnt); + if (!json_buf) + return; + pthread_mutex_lock(&xprt_list_lock); + LIST_FOREACH(x, &xprt_list, xprt_link) { if (x->remote_dir_xid) - send_dir_update(x, t, set); - next_x = (struct ldms_xprt *)ldms_xprt_next(x); - ldms_xprt_put(x); - x = next_x; + send_dir_update(x, t, json_buf, json_cnt); } + pthread_mutex_unlock(&xprt_list_lock); + free(json_buf); } void __ldms_dir_add_set(struct ldms_set *set) @@ -342,15 +483,30 @@ void ldms_xprt_close(ldms_t x) __ldms_xprt_term(x); } +/* + * Must be called with the set->lock held + */ +static void __ldms_drop_rbd_set_refs(struct ldms_rbuf_desc *rbd) +{ + struct ldms_set *set = rbd->set; + struct ldms_rbuf_desc *tgt; + LIST_FOREACH(tgt, &set->local_rbd_list, set_link) { + if (tgt == rbd) + goto drop_refs; + } + LIST_FOREACH(tgt, &set->remote_rbd_list, set_link) { + if (tgt == rbd) + goto drop_refs; + } + return; + drop_refs: + LIST_REMOVE(rbd, set_link); + ref_put(&rbd->ref, "set_rbd_list"); +} + void __ldms_xprt_resource_free(struct ldms_xprt *x) { pthread_mutex_lock(&x->lock); - - struct ldms_context *dir_ctxt; - if (x->local_dir_xid) { - dir_ctxt = (struct ldms_context *)(unsigned long)x->local_dir_xid; - __ldms_free_ctxt(x, dir_ctxt); - } x->remote_dir_xid = x->local_dir_xid = 0; #ifdef DEBUG @@ -388,20 +544,29 @@ void __ldms_xprt_resource_free(struct ldms_xprt *x) struct rbn *rbn; struct ldms_rbuf_desc *rbd; + struct ldms_set *set; while ((rbn = rbt_min(&x->rbd_rbt))) { rbd = RBN_RBD(rbn); - if (rbd->type == LDMS_RBD_LOCAL || rbd->type == LDMS_RBD_TARGET) - __ldms_free_rbd(rbd); - else - __ldms_rbd_xprt_release(rbd); + set = rbd->set; + if (set) { + pthread_mutex_unlock(&x->lock); + pthread_mutex_lock(&set->lock); + __ldms_drop_rbd_set_refs(rbd); + pthread_mutex_unlock(&set->lock); + pthread_mutex_lock(&x->lock); + } + __ldms_rbd_xprt_release(rbd); } - if (x->auth) + if (x->auth) { ldms_auth_free(x->auth); + x->auth = NULL; + } pthread_mutex_unlock(&x->lock); } void ldms_xprt_put(ldms_t x) { + int remove = 0; assert(x->ref_count); /* * The xprt could be destroyed any time ldms_xprt_put is called. @@ -411,18 +576,68 @@ void ldms_xprt_put(ldms_t x) */ pthread_mutex_lock(&xprt_list_lock); if (0 == __sync_sub_and_fetch(&x->ref_count, 1)) { + remove = 1; LIST_REMOVE(x, xprt_link); #ifdef DEBUG x->xprt_link.le_next = 0; x->xprt_link.le_prev = 0; #endif /* DEBUG */ - __ldms_xprt_resource_free(x); - if (x->zap_ep) - zap_free(x->zap_ep); - sem_destroy(&x->sem); - free(x); } pthread_mutex_unlock(&xprt_list_lock); + + if (!remove) + return; + + __ldms_xprt_resource_free(x); + if (x->zap_ep) + zap_free(x->zap_ep); + sem_destroy(&x->sem); + free(x); +} + +static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request *req) +{ + struct ldms_reply reply; + struct ldms_set *set; + struct ldms_rbuf_desc *r; + + __ldms_set_tree_lock(); + set = __ldms_find_local_set(req->set_delete.inst_name); + __ldms_set_tree_unlock(); + if (!set) + goto reply; + r = ldms_lookup_rbd(x, set); + if (!r) + goto reply_1; + if (x->event_cb) { + struct ldms_xprt_event event; + event.type = LDMS_XPRT_EVENT_SET_DELETE; + event.set_delete.set = r; + event.data_len = sizeof(ldms_set_t); + x->event_cb(x, &event, x->event_cb_arg); + } + reply_1: + ref_put(&set->ref, "__ldms_find_local_set"); + reply: + /* Initialize the reply header */ + reply.hdr.xid = req->hdr.xid; + reply.hdr.cmd = htonl(LDMS_CMD_SET_DELETE_REPLY); + reply.hdr.rc = 0; + reply.hdr.len = htonl(sizeof(reply.hdr)); + zap_err_t zerr = zap_send(x->zap_ep, &reply, sizeof(reply.hdr)); + if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; + x->log("%s: zap_send synchronously error. '%s'\n", + __FUNCTION__, zap_err_str(zerr)); + } +} + +static +void process_set_delete_reply(struct ldms_xprt *x, struct ldms_reply *reply, + struct ldms_context *ctxt) +{ + ctxt->set_delete.cb(x, reply->hdr.rc, ctxt->set_delete.s, ctxt->set_delete.cb_arg); + __ldms_free_ctxt(x, ctxt); } struct make_dir_arg { @@ -493,13 +708,19 @@ static void process_dir_request(struct ldms_xprt *x, struct ldms_request *req) reply->dir.json_data_len = htonl(cnt); reply->dir.more = 0; zerr = zap_send(x->zap_ep, reply, cnt + hdrlen); - if (zerr != ZAP_ERR_OK) + if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: x %p: zap_send synchronous error. '%s'\n", __FUNCTION__, x, zap_err_str(zerr)); + } + free(reply); return; } + struct ldms_set *set; LIST_FOREACH(name, &name_list, entry) { - struct ldms_set *set = __ldms_find_local_set(name->name); + __ldms_set_tree_lock(); + set = __ldms_find_local_set(name->name); + __ldms_set_tree_unlock(); if (!set) continue; uid = __le32_to_cpu(set->meta->uid); @@ -512,8 +733,11 @@ static void process_dir_request(struct ldms_xprt *x, struct ldms_request *req) /* Start new dir message */ cnt = snprintf(reply->dir.json_data, len - hdrlen, "{ \"directory\" : ["); - if (cnt >= len - hdrlen) + if (cnt >= len - hdrlen) { + rc = ENOMEM; + ref_put(&set->ref, "__ldms_find_local_set"); goto out; + } } if (0 == ldms_access_check(x, LDMS_ACCESS_READ, uid, gid, perm)) { @@ -528,15 +752,19 @@ static void process_dir_request(struct ldms_xprt *x, struct ldms_request *req) last_cnt += snprintf(&reply->dir.json_data[last_cnt], len - hdrlen - last_cnt, "]}"); - if (last_cnt >= len - hdrlen) + if (last_cnt >= len - hdrlen) { + ref_put(&set->ref, "__ldms_find_local_set"); goto out; + } reply->hdr.len = htonl(last_cnt + hdrlen); reply->dir.json_data_len = htonl(last_cnt); reply->dir.more = htonl(1); zerr = zap_send(x->zap_ep, reply, last_cnt + hdrlen); - if (zerr != ZAP_ERR_OK) + if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: x %p: zap_send synchronous error. '%s'\n", __FUNCTION__, x, zap_err_str(zerr)); + } cnt = 0; goto restart; } @@ -548,16 +776,20 @@ static void process_dir_request(struct ldms_xprt *x, struct ldms_request *req) "]}"); if (cnt >= len - hdrlen - 3) { rc = ENOMEM; + ref_put(&set->ref, "__ldms_find_local_set"); goto out; } reply->hdr.len = htonl(cnt + hdrlen); reply->dir.json_data_len = htonl(cnt); reply->dir.more = 0; zerr = zap_send(x->zap_ep, reply, cnt + hdrlen); - if (zerr != ZAP_ERR_OK) + if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: x %p: zap_send synchronous error. '%s'\n", __FUNCTION__, x, zap_err_str(zerr)); + } } + ref_put(&set->ref, "__ldms_find_local_set"); } free(reply); __ldms_empty_name_list(&name_list); @@ -577,6 +809,7 @@ static void process_dir_request(struct ldms_xprt *x, struct ldms_request *req) zerr = zap_send(x->zap_ep, reply, len); if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: zap_send synchronously error. '%s'\n", __FUNCTION__, zap_err_str(zerr)); ldms_xprt_close(x); @@ -632,11 +865,11 @@ __rbd_by_set_id(struct ldms_xprt *x, uint64_t id) /* 1st check if the set still exits */ __ldms_set_tree_lock(); set = __ldms_set_by_id(id); + __ldms_set_tree_unlock(); if (!set) goto out; r = ldms_lookup_rbd(x, set); out: - __ldms_set_tree_unlock(); return r; } @@ -666,7 +899,6 @@ process_cancel_push_request(struct ldms_xprt *x, struct ldms_request *req) struct ldms_rbuf_desc *push_rbd; struct ldms_set *set; uint64_t remote_set_id; - int rc; push_rbd = __rbd_by_set_id(x, req->cancel_push.set_id); if (!push_rbd) { @@ -691,10 +923,7 @@ process_cancel_push_request(struct ldms_xprt *x, struct ldms_request *req) */ push_rbd->remote_set_id = 0; - struct ldms_xprt *xprt = push_rbd->xprt; - pthread_mutex_lock(&xprt->lock); - __ldms_free_rbd(push_rbd); - pthread_mutex_unlock(&xprt->lock); + __ldms_free_rbd(push_rbd, "rendezvous_push"); LIST_FOREACH(r, &set->remote_rbd_list, set_link) { if (r->push_flags & LDMS_RBD_F_PUSH_CHANGE) @@ -718,10 +947,11 @@ process_cancel_push_request(struct ldms_xprt *x, struct ldms_request *req) reply.push.data_len = 0; reply.push.data_off = 0; reply.push.flags = htonl(LDMS_UPD_F_PUSH | LDMS_UPD_F_PUSH_LAST); - rc = zap_send(x->zap_ep, &reply, len); - if (rc) { - x->log("%s(): x %p: error %d sending PUSH_REPLY\n", - __func__, x, rc); + zap_err_t zerr = zap_send(x->zap_ep, &reply, len); + if (zerr) { + x->zerrno = zerr; + x->log("%s(): x %p: error %s sending PUSH_REPLY\n", + __func__, x, zap_err_str(zerr)); } ldms_xprt_put(x); return; @@ -820,8 +1050,8 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, rbd = ldms_lookup_rbd(x, set); if (!rbd) { rc = ENOMEM; - /* Allocate a new RBD for this set */ - rbd = __ldms_alloc_rbd(x, set, LDMS_RBD_LOCAL); + /* Allocate a new RBD for this set, ref put in __set_delete_cb() */ + rbd = __ldms_alloc_rbd(x, set, LDMS_RBD_LOCAL, "share_lookup"); if (!rbd) goto err_0; } @@ -880,9 +1110,11 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, */ + sizeof(struct ldms_name) * (2 + (set_info_cnt) * 2 + 1) + name->len + schema->len + set_info_len; - msg = malloc(msg_len); - if (!msg) + msg = calloc(1, msg_len); + if (!msg) { + rc = ENOMEM; goto err_0; + } __copy_set_info_to_lookup_msg(msg->lookup.set_info, schema, name, set); pthread_mutex_unlock(&set->lock); @@ -902,10 +1134,11 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, #endif /* DEBUG */ zap_err_t zerr = zap_share(x->zap_ep, rbd->lmap, (const char *)msg, msg_len); if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; + rc = zap_zerr2errno(zerr); x->log("%s: x %p: zap_share synchronously error. '%s'\n", __FUNCTION__, x, zap_err_str(zerr)); free(msg); - rc = zerr; goto err_0; } pthread_mutex_unlock(&x->lock); @@ -977,10 +1210,11 @@ static void process_lookup_request_re(struct ldms_xprt *x, struct ldms_request * rc = ENOENT; goto err_1; } + __ldms_set_tree_unlock(); rc = __send_lookup_reply(x, set, req->hdr.xid, 0); + ref_put(&set->ref, "__ldms_find_local_set"); if (rc) goto err_1; - __ldms_set_tree_unlock(); return; } @@ -1027,8 +1261,9 @@ static void process_lookup_request_re(struct ldms_xprt *x, struct ldms_request * hdr.xid = req->hdr.xid; hdr.cmd = htonl(LDMS_CMD_LOOKUP_REPLY); hdr.len = htonl(sizeof(struct ldms_reply_hdr)); - rc = zap_send(x->zap_ep, &hdr, sizeof(hdr)); - if (rc != ZAP_ERR_OK) { + zap_err_t zerr = zap_send(x->zap_ep, &hdr, sizeof(hdr)); + if (zerr != ZAP_ERR_OK) { + x->zerrno = zerr; x->log("%s: x %p: zap_send synchronously errors '%s'\n", __func__, x, zap_err_str(rc)); ldms_xprt_close(x); @@ -1056,28 +1291,20 @@ static int do_read_all(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) uint32_t len = __le32_to_cpu(s->set->meta->meta_sz) + __le32_to_cpu(s->set->meta->data_sz); - /* Prevent x being destroyed if DISCONNECTED is delivered in another thread */ - ldms_xprt_get(x); - pthread_mutex_lock(&x->lock); - ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE); + ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE, + s, cb, arg, 0, 0); if (!ctxt) { rc = ENOMEM; goto out; } - ctxt->update.s = s; - ctxt->update.cb = cb; - ctxt->update.arg = arg; - ctxt->update.idx_from = 0; - ctxt->update.idx_to = 0; - rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), - s->lmap, zap_map_addr(s->lmap), len, ctxt); - if (rc) + s->lmap, zap_map_addr(s->lmap), len, ctxt); + if (rc) { + x->zerrno = rc; __ldms_free_ctxt(x, ctxt); + } out: - pthread_mutex_unlock(&x->lock); - ldms_xprt_put(x); - return rc; + return zap_zerr2errno(rc); } static int do_read_meta(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) @@ -1089,25 +1316,20 @@ static int do_read_meta(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) int rc; uint32_t meta_sz = __le32_to_cpu(s->set->meta->meta_sz); - ldms_xprt_get(x); - pthread_mutex_lock(&x->lock); - ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE_META); + ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE_META, + s, cb, arg, 0, 0); if (!ctxt) { rc = ENOMEM; goto out; } - ctxt->update.s = s; - ctxt->update.cb = cb; - ctxt->update.arg = arg; - rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), meta_sz, ctxt); - if (rc) + if (rc) { + x->zerrno = rc; __ldms_free_ctxt(x, ctxt); + } out: - pthread_mutex_unlock(&x->lock); - ldms_xprt_put(x); - return rc; + return zap_zerr2errno(rc); } static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, @@ -1121,34 +1343,26 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, size_t doff, dlen; TF(); - /* Prevent x being destroyed if DISCONNECTED is delivered in another thread */ - assert(x == s->xprt); - ldms_xprt_get(x); + ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE, + s, cb, arg, idx_from, idx_to); - pthread_mutex_lock(&x->lock); - ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE); if (!ctxt) { rc = ENOMEM; goto out; } - ctxt->update.s = s; - ctxt->update.cb = cb; - ctxt->update.arg = arg; - ctxt->update.idx_from = idx_from; - ctxt->update.idx_to = idx_to; data_sz = __le32_to_cpu(s->set->meta->data_sz); doff = (uint8_t *)s->set->data_array - (uint8_t *)s->set->meta + idx_from * data_sz; dlen = (idx_to - idx_from + 1) * data_sz; - rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap) + doff, s->lmap, zap_map_addr(s->lmap) + doff, dlen, ctxt); - if (rc) + if (rc) { + x->zerrno = rc; + rc = zap_zerr2errno(rc); __ldms_free_ctxt(x, ctxt); + } out: - pthread_mutex_unlock(&x->lock); - ldms_xprt_put(x); return rc; } @@ -1170,15 +1384,13 @@ int __ldms_remote_update(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) if (LDMS_XPRT_AUTH_GUARD(x)) return EPERM; - int rc; struct ldms_set *set = s->set; uint32_t meta_meta_gn = __le32_to_cpu(set->meta->meta_gn); uint32_t data_meta_gn = __le32_to_cpu(set->data->meta_gn); uint32_t n = __le32_to_cpu(set->meta->array_card); int idx_from, idx_to, idx_next, idx_curr; - - zap_get_ep(x->zap_ep); /* Released in handle_zap_read_complete() */ + zap_get_ep(x->zap_ep); /* Released in handle_zap_read_complete() */ if (meta_meta_gn == 0 || meta_meta_gn != data_meta_gn) { if (set->curr_idx == (n-1)) { /* We can update the metadata along with the data */ @@ -1236,9 +1448,13 @@ int ldms_xprt_recv_request(struct ldms_xprt *x, struct ldms_request *req) rc = process_auth_msg(x, req); if (rc) { x->auth_flag = LDMS_XPRT_AUTH_FAILED; + __sync_fetch_and_add(&xprt_auth_fail_count, 1); __ldms_xprt_term(x); } break; + case LDMS_CMD_SET_DELETE: + process_set_delete_request(x, req); + break; default: x->log("Unrecognized request %d\n", cmd); assert(0 == "Unrecognized LDMS_CMD request type"); @@ -1257,8 +1473,8 @@ void process_lookup_reply(struct ldms_xprt *x, struct ldms_reply *reply, x->log("WARNING: Receive lookup reply error with rc: 0\n"); goto out; } - if (ctxt->lookup.cb) - ctxt->lookup.cb(x, rc, 0, NULL, ctxt->lookup.cb_arg); + if (ctxt->lu_req.cb) + ctxt->lu_req.cb(x, rc, 0, NULL, ctxt->lu_req.cb_arg); out: zap_put_ep(x->zap_ep); /* Taken in __ldms_remote_lookup() */ @@ -1473,6 +1689,8 @@ void __process_dir_reply(struct ldms_xprt *x, struct ldms_reply *reply, dir->set_data[i].info_count = info_count; lset = __ldms_find_local_set(dir->set_data[i].inst_name); rc = __process_dir_set_info(lset, type, &dir->set_data[i], info_list); + if (lset) + ref_put(&lset->ref, "__ldms_find_local_set"); __ldms_set_tree_unlock(); if (rc) break; @@ -1539,7 +1757,7 @@ static void process_req_notify_reply(struct ldms_xprt *x, struct ldms_reply *rep ctxt->req_notify.cb((ldms_t)x, ctxt->req_notify.s, - event, ctxt->req_notify.arg); + event, ctxt->req_notify.cb_arg); } static void process_push_reply(struct ldms_xprt *x, struct ldms_reply *reply, @@ -1622,6 +1840,9 @@ static int ldms_xprt_recv_reply(struct ldms_xprt *x, struct ldms_reply *reply) case LDMS_CMD_REQ_NOTIFY_REPLY: process_req_notify_reply(x, reply, ctxt); break; + case LDMS_CMD_SET_DELETE_REPLY: + process_set_delete_reply(x, reply, ctxt); + break; default: x->log("Unrecognized reply %d\n", cmd); } @@ -1662,6 +1883,7 @@ void __ldms_passive_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) case LDMS_XPRT_EVENT_CONNECTED: break; case LDMS_XPRT_EVENT_DISCONNECTED: + __ldms_xprt_resource_free(x); ldms_xprt_put(x); break; case LDMS_XPRT_EVENT_RECV: @@ -1778,9 +2000,13 @@ static void __handle_update_data(ldms_t x, struct ldms_context *ctxt, assert(ctxt->update.cb); rc = LDMS_UPD_ERROR(ev->status); - if (rc) { + if (rc || (set == NULL)) { + x->zerrno = rc; + rc = zap_zerr2errno(rc); /* READ ERROR */ - ctxt->update.cb(x, s, rc, ctxt->update.arg); + if (!rc) + rc = ENOENT; + ctxt->update.cb(x, s, rc, ctxt->update.cb_arg); goto cleanup; } n = __le32_to_cpu(set->meta->array_card); @@ -1810,7 +2036,7 @@ static void __handle_update_data(ldms_t x, struct ldms_context *ctxt, } else { flags = LDMS_UPD_F_MORE; } - ctxt->update.cb(x, s, flags, ctxt->update.arg); + ctxt->update.cb(x, s, flags, ctxt->update.cb_arg); prev_data = data; } @@ -1819,9 +2045,9 @@ static void __handle_update_data(ldms_t x, struct ldms_context *ctxt, /* the updated set is not current */ rc = __ldms_remote_update(x, s, ctxt->update.cb, - ctxt->update.arg); + ctxt->update.cb_arg); if (rc) - ctxt->update.cb(x, s, LDMS_UPD_ERROR(rc), ctxt->update.arg); + ctxt->update.cb(x, s, LDMS_UPD_ERROR(rc), ctxt->update.cb_arg); cleanup: zap_put_ep(x->zap_ep); /* from __ldms_remote_update() */ @@ -1838,9 +2064,9 @@ static void __handle_update_meta(ldms_t x, struct ldms_context *ctxt, struct ldms_set *set = s->set; int idx = (set->curr_idx + 1) % __le32_to_cpu(set->meta->array_card); - rc = do_read_data(x, s, idx, idx, ctxt->update.cb, ctxt->update.arg); + rc = do_read_data(x, s, idx, idx, ctxt->update.cb, ctxt->update.cb_arg); if (rc) { - ctxt->update.cb(x, s, LDMS_UPD_ERROR(rc), ctxt->update.arg); + ctxt->update.cb(x, s, LDMS_UPD_ERROR(rc), ctxt->update.cb_arg); zap_put_ep(x->zap_ep); } /* do_read_data has its own context */ @@ -1853,25 +2079,29 @@ static void __handle_lookup(ldms_t x, struct ldms_context *ctxt, zap_event_t ev) { int status = 0; - if (!ctxt->lookup.cb) + if (!ctxt->lu_read.cb) goto ctxt_cleanup; if (ev->status != ZAP_ERR_OK) { status = EREMOTEIO; -#ifdef DEBUG - x->log("DEBUG: %s: lookup read error: zap error %d. ldms lookup status %d\n", - ldms_set_instance_name_get(ctxt->lookup.s), ev->status, status); +#if 1 // def DEBUG + x->log("DEBUG: %s: lookup read error: zap error %d. " + "ldms lookup status %d\n", + ldms_set_instance_name_get(ctxt->lu_read.s), + ev->status, status); #endif /* DEBUG */ /* - * The rbd is in the xprt list, and will be cleaned up by the - * transport. + * Destroy the set, the ctxt still has a reference on + * it, but that will be dropped when the ctxt is + * freed. */ - ctxt->lookup.s = NULL; + ldms_set_delete(ctxt->lu_read.s); } else { - ldms_set_publish(ctxt->lookup.s); + ldms_set_publish(ctxt->lu_read.s); } - ctxt->lookup.cb((ldms_t)x, status, ctxt->lookup.more, ctxt->lookup.s, - ctxt->lookup.cb_arg); - if (!ctxt->lookup.more) { + ctxt->lu_read.cb((ldms_t)x, status, ctxt->lu_read.more, + ev->status ? NULL : ctxt->lu_read.s, + ctxt->lu_read.cb_arg); + if (!ctxt->lu_read.more) { zap_put_ep(x->zap_ep); /* Taken in __ldms_remote_lookup() */ #ifdef DEBUG assert(x->active_lookup > 0); @@ -1900,7 +2130,7 @@ static void handle_zap_read_complete(zap_ep_t zep, zap_event_t ev) case LDMS_CONTEXT_UPDATE_META: __handle_update_meta(x, ctxt, ev); break; - case LDMS_CONTEXT_LOOKUP: + case LDMS_CONTEXT_LOOKUP_READ: __handle_lookup(x, ctxt, ev); break; default: @@ -1922,11 +2152,11 @@ int __is_lookup_name_good(struct ldms_xprt *x, int rc = 0; name = (ldms_name_t)lu->set_info; - if (!(ctxt->lookup.flags & LDMS_LOOKUP_BY_SCHEMA)) { + if (!(ctxt->lu_req.flags & LDMS_LOOKUP_BY_SCHEMA)) { name = (ldms_name_t)(&name->name[name->len]); } - if (ctxt->lookup.flags & LDMS_LOOKUP_RE) { - rc = regcomp(®ex, ctxt->lookup.path, REG_EXTENDED | REG_NOSUB); + if (ctxt->lu_req.flags & LDMS_LOOKUP_RE) { + rc = regcomp(®ex, ctxt->lu_req.path, REG_EXTENDED | REG_NOSUB); if (rc) { char errstr[512]; (void)regerror(rc, ®ex, errstr, sizeof(errstr)); @@ -1936,7 +2166,7 @@ int __is_lookup_name_good(struct ldms_xprt *x, rc = regexec(®ex, name->name, 0, NULL, 0); } else { - rc = strncmp(ctxt->lookup.path, name->name, name->len); + rc = strncmp(ctxt->lu_req.path, name->name, name->len); } return (rc == 0); @@ -2007,8 +2237,10 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, struct ldms_xprt *x, struct ldms_rendezvous_msg *lm) { + struct ldms_rendezvous_lookup_param *lu = &lm->lookup; struct ldms_context *ctxt = (void*)lm->hdr.xid; + struct ldms_context *rd_ctxt; ldms_set_t rbd = NULL; int rc; ldms_name_t schema_name, inst_name; @@ -2027,17 +2259,32 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, __ldms_set_tree_lock(); struct ldms_set *lset = __ldms_find_local_set(inst_name->name); - if (lset) { + __ldms_set_tree_unlock(); + if (!lset) { + lset = __ldms_create_set(inst_name->name, schema_name->name, + ntohl(lu->meta_len), ntohl(lu->data_len), + ntohl(lu->card), + ntohl(lu->array_card), + LDMS_SET_F_REMOTE); + if (!lset) { + rc = errno; + goto callback; + } + pthread_mutex_lock(&lset->lock); + (void)__process_lookup_set_info(lset, &inst_name->name[inst_name->len]); + pthread_mutex_unlock(&lset->lock); + } else { ldms_name_t lschema = get_schema_name(lset->meta); if (0 != strcmp(schema_name->name, lschema->name)) { /* Two sets have the same name but different schema */ rc = EINVAL; - goto unlock_out; + ref_put(&lset->ref, "__ldms_find_local_set"); + goto callback; } rbd = ldms_lookup_rbd(x, lset); if (rbd) { - if (!(ctxt->lookup.flags & LDMS_LOOKUP_SET_INFO)) { + if (!(ctxt->lu_req.flags & LDMS_LOOKUP_SET_INFO)) { rc = EEXIST; } else { pthread_mutex_lock(&lset->lock); @@ -2045,118 +2292,81 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, &inst_name->name[inst_name->len]); pthread_mutex_unlock(&lset->lock); } - goto unlock_out; - } - } else { - lset = __ldms_create_set(inst_name->name, schema_name->name, - ntohl(lu->meta_len), ntohl(lu->data_len), - ntohl(lu->card), - ntohl(lu->array_card), - LDMS_SET_F_REMOTE); - if (!lset) { - rc = errno; - goto unlock_out; + ref_put(&lset->ref, "__ldms_find_local_set"); + goto callback; } + ref_put(&lset->ref, "__ldms_find_local_set"); } - /* Need to lock the set in case that the set has already been looked up. */ - pthread_mutex_lock(&lset->lock); - rc = __process_lookup_set_info(lset, &inst_name->name[inst_name->len]); - pthread_mutex_unlock(&lset->lock); - if (rc) - goto unlock_out; - - __ldms_set_tree_unlock(); /* Bind this set to a new RBD. We will initiate RDMA_READ */ - rbd = __ldms_alloc_rbd(x, lset, LDMS_RBD_INITIATOR); + pthread_mutex_lock(&x->lock); + /* Dropped in process_set_delete_request() */ + rbd = __ldms_alloc_rbd(x, lset, LDMS_RBD_INITIATOR, "rendezvous_lookup"); if (!rbd) { rc = ENOMEM; - goto out_1; + goto callback_with_lock; } + ref_get(&rbd->set->ref, "set_new"); /* Dropped in ldms_set_delete() */ + ref_get(&rbd->ref, "set_new"); /* Dropped in ldms_set_delete() */ rbd->rmap = ev->map; rbd->remote_set_id = lm->lookup.set_id; - pthread_mutex_lock(&x->lock); - struct ldms_context *rd_ctxt; - if (lu->more) { - rd_ctxt = __ldms_alloc_ctxt(x, sizeof(*rd_ctxt), - LDMS_CONTEXT_LOOKUP); - if (!rd_ctxt) { - x->log("%s(): Out of memory\n", __func__); - rc = ENOMEM; - pthread_mutex_unlock(&x->lock); - goto out_1; - } - - rd_ctxt->sem = ctxt->sem; - rd_ctxt->sem_p = ctxt->sem_p; - rd_ctxt->rc = ctxt->rc; - rd_ctxt->type = ctxt->type; - rd_ctxt->lookup = ctxt->lookup; - rd_ctxt->lookup.path = strdup(ctxt->lookup.path); - if (!rd_ctxt->lookup.path) { - rc = ENOMEM; - __ldms_free_ctxt(x, rd_ctxt); - pthread_mutex_unlock(&x->lock); - goto out_1; - } - } else { - rd_ctxt = ctxt; + rd_ctxt = __ldms_alloc_ctxt(x, sizeof(*rd_ctxt), + LDMS_CONTEXT_LOOKUP_READ, + rbd, + ctxt->lu_req.cb, ctxt->lu_req.cb_arg, + htonl(lu->more), ctxt->lu_req.flags); + if (!rd_ctxt) { + x->log("%s(): Out of memory\n", __func__); + rc = ENOMEM; + pthread_mutex_unlock(&x->lock); + goto callback_with_lock; } - rd_ctxt->lookup.s = rbd; - rd_ctxt->lookup.more = ntohl(lu->more); + rd_ctxt->sem = ctxt->sem; + rd_ctxt->sem_p = ctxt->sem_p; + rd_ctxt->rc = ctxt->rc; pthread_mutex_unlock(&x->lock); - if (zap_read(zep, - rbd->rmap, zap_map_addr(rbd->rmap), - rbd->lmap, zap_map_addr(rbd->lmap), - __le32_to_cpu(rbd->set->meta->meta_sz), - rd_ctxt)) { - rc = EIO; - goto out_2; - } - return; - - unlock_out: - if (rc || (rbd && rbd->rmap != ev->map)) { - /* unmap ev->map if it is not used */ - zap_unmap(x->zap_ep, ev->map); - } - __ldms_set_tree_unlock(); - goto out; - out_2: - if (lu->more) { + rc = zap_read(zep, + rbd->rmap, zap_map_addr(rbd->rmap), + rbd->lmap, zap_map_addr(rbd->lmap), + __le32_to_cpu(rbd->set->meta->meta_sz), + rd_ctxt); + if (rc) { + x->zerrno = rc; + rc = zap_zerr2errno(rc); pthread_mutex_lock(&x->lock); - __ldms_free_ctxt(x, ctxt); - pthread_mutex_unlock(&x->lock); + goto callback_with_lock; } + __ldms_free_ctxt(x, ctxt); + return; - out_1: - if (rbd) { - ldms_set_delete(rbd); - rbd = NULL; - } - out: + callback: + pthread_mutex_lock(&x->lock); + callback_with_lock: #ifdef DEBUG x->log("DEBUG: %s: lookup error while ldms_xprt is processing the rendezvous " "with error %d. NOTE: error %d indicates that it is " "a synchronous error of zap_read\n", inst_name, rc, EIO); #endif /* DEBUG */ - if (ctxt->lookup.cb) - ctxt->lookup.cb(x, rc, 0, rbd, ctxt->lookup.cb_arg); - if (!lu->more) { - zap_put_ep(x->zap_ep); /* Taken in __ldms_remote_lookup() */ - pthread_mutex_lock(&x->lock); - __ldms_free_ctxt(x, ctxt); + if (rbd) + ref_get(&rbd->ref, "rendezvous_error"); + if (ctxt->lu_req.cb) + ctxt->lu_req.cb(x, rc, 0, rc ? NULL : rbd, ctxt->lu_req.cb_arg); + zap_put_ep(x->zap_ep); /* Taken in __ldms_remote_lookup() */ + __ldms_free_ctxt(x, ctxt); #ifdef DEBUG - assert(x->active_lookup); - x->active_lookup--; - x->log("DEBUG: rendezvous error: put ref %p: " - "active_lookup = %d\n", - x->zap_ep, x->active_lookup); + assert(x->active_lookup); + x->active_lookup--; + x->log("DEBUG: rendezvous error: put ref %p: " + "active_lookup = %d\n", + x->zap_ep, x->active_lookup); #endif /* DEBUG */ - pthread_mutex_unlock(&x->lock); + pthread_mutex_unlock(&x->lock); + if (rbd) { + ldms_set_delete(rbd); + ref_put(&rbd->ref, "rendezvous_error"); } } @@ -2171,7 +2381,7 @@ static void handle_rendezvous_push(zap_ep_t zep, zap_event_t ev, set = __ldms_set_by_id(push->lookup_set_id); if (!set) { /* - * The set has been deleted. + * The set has been deleted. */ return; } @@ -2182,18 +2392,20 @@ static void handle_rendezvous_push(zap_ep_t zep, zap_event_t ev, push_rbd = ldms_lookup_rbd(x, set); if (push_rbd) { if (push_rbd->push_flags & LDMS_RBD_F_PUSH) { + ref_get(&push_rbd->set->ref, "rendezvous_push"); /* Update the push flags, but otherwise, do nothing */ goto out; } } /* We will be the target of RDMA_WRITE */ - push_rbd = __ldms_alloc_rbd(x, set, LDMS_RBD_TARGET); + push_rbd = __ldms_alloc_rbd(x, set, LDMS_RBD_TARGET, "rendezvous_push"); if (!push_rbd) { struct ldms_xprt *x = zap_get_ucontext(zep); x->log("handle_rendezvous_push: __ldms_alloc_rbd out of memory\n"); return; } + push_rbd->rmap = ev->map; push_rbd->remote_set_id = push->push_set_id; out: @@ -2281,6 +2493,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) handle_zap_rendezvous(zep, ev); break; case ZAP_EVENT_CONNECT_REQUEST: + __sync_fetch_and_add(&xprt_connect_request_count, 1); if (0 != __ldms_conn_msg_verify(x, ev->data, ev->data_len, rej_msg, sizeof(rej_msg))) { zap_reject(zep, rej_msg, strlen(rej_msg)+1); @@ -2289,6 +2502,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_zap_handle_conn_req(zep); break; case ZAP_EVENT_REJECTED: + __sync_fetch_and_add(&xprt_reject_count, 1); event.type = LDMS_XPRT_EVENT_REJECTED; if (x->event_cb) x->event_cb(x, &event, x->event_cb_arg); @@ -2296,6 +2510,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_put(x); break; case ZAP_EVENT_CONNECTED: + __sync_fetch_and_add(&xprt_connect_count, 1); /* actively connected -- expecting conn_msg */ if (0 != __ldms_conn_msg_verify(x, ev->data, ev->data_len, rej_msg, sizeof(rej_msg))) { @@ -2313,6 +2528,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) ldms_xprt_put(x); break; case ZAP_EVENT_DISCONNECTED: + __sync_fetch_and_add(&xprt_disconnect_count, 1); /* deliver only if CONNECTED has been delivered. */ /* i.e. auth_flag == APPROVED */ switch (x->auth_flag) { @@ -2331,6 +2547,16 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) event.type = LDMS_XPRT_EVENT_ERROR; break; } + pthread_mutex_lock(&x->lock); + struct ldms_context *dir_ctxt = NULL; + if (x->local_dir_xid) { + dir_ctxt = (struct ldms_context *)(unsigned long) + x->local_dir_xid; + x->local_dir_xid = 0; + } + pthread_mutex_unlock(&x->lock); + if (dir_ctxt) + __ldms_free_ctxt(x, dir_ctxt); if (x->event_cb) x->event_cb(x, &event, x->event_cb_arg); #ifdef DEBUG @@ -2339,6 +2565,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) x, x->ref_count); #endif /* DEBUG */ /* Put the reference taken in ldms_xprt_connect() or accept() */ + __ldms_xprt_resource_free(x); ldms_xprt_put(x); break; default: @@ -2456,6 +2683,10 @@ void __ldms_xprt_init(struct ldms_xprt *x, const char *name, x->ruid = -1; x->rgid = -1; + ldms_xprt_ops_t op_e; + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) + x->stats.ops[op_e].min_us = LLONG_MAX; + x->log = log_fn; TAILQ_INIT(&x->ctxt_list); sem_init(&x->sem, 0, 0); @@ -2518,6 +2749,11 @@ ldms_t ldms_xprt_new_with_auth(const char *xprt_name, ldms_log_fn_t log_fn, return NULL; } +const char *ldms_xprt_type_name(ldms_t x) +{ + return x->name; +} + ldms_t ldms_xprt_new(const char *name, ldms_log_fn_t log_fn) { return ldms_xprt_new_with_auth(name, log_fn, "none", NULL); @@ -2575,6 +2811,20 @@ size_t format_req_notify_req(struct ldms_request *req, return len; } +size_t format_set_delete_req(struct ldms_request *req, uint64_t xid, const char *inst_name) +{ + int32_t name_len = strlen(inst_name) + 1; + size_t len = sizeof(struct ldms_request_hdr) + + sizeof(struct ldms_set_delete_cmd_param) + + name_len; + req->hdr.xid = xid; + req->hdr.cmd = htonl(LDMS_CMD_SET_DELETE); + req->hdr.len = htonl(len); + strcpy(req->set_delete.inst_name, inst_name); + req->set_delete.inst_name_len = htonl(name_len); + return len; +} + size_t format_cancel_notify_req(struct ldms_request *req, uint64_t xid, uint64_t set_id) { @@ -2587,29 +2837,6 @@ size_t format_cancel_notify_req(struct ldms_request *req, uint64_t xid, return len; } -/* - * This is the generic allocator for both the request buffer and the - * context buffer. A single buffer is allocated that is big enough to - * contain one structure. When the context is freed, the associated - * request buffer is freed as well. - * - * Caller must call with the ldms xprt lock held. - */ -static int alloc_req_ctxt(struct ldms_xprt *x, - struct ldms_request **req, - struct ldms_context **ctxt, - ldms_context_type_t type) -{ - struct ldms_context *ctxt_; - size_t sz = sizeof(struct ldms_request) + sizeof(struct ldms_context); - void *buf = __ldms_alloc_ctxt(x, sz, type); - if (!buf) - return 1; - *ctxt = ctxt_ = buf; - *req = (struct ldms_request *)(ctxt_+1); - return 0; -} - int ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) { struct ldms_xprt *x = _x; @@ -2657,7 +2884,7 @@ int ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) size_t ldms_xprt_msg_max(ldms_t x) { - return x->max_msg - (sizeof(struct ldms_request_hdr) + + return x->max_msg - (sizeof(struct ldms_request_hdr) + sizeof(struct ldms_send_cmd_param)); } @@ -2668,6 +2895,9 @@ int __ldms_remote_dir(ldms_t _x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags) struct ldms_context *ctxt; size_t len; + if (!zap_ep_connected(x->zap_ep)) + return ENOTCONN; + if (LDMS_XPRT_AUTH_GUARD(x)) return EPERM; @@ -2679,15 +2909,15 @@ int __ldms_remote_dir(ldms_t _x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags) /* Prevent x being destroyed if DISCONNECTED is delivered in another thread */ ldms_xprt_get(x); pthread_mutex_lock(&x->lock); - if (alloc_req_ctxt(x, &req, &ctxt, LDMS_CONTEXT_DIR)) { + ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt) + sizeof(*req), + LDMS_CONTEXT_DIR, cb, cb_arg); + if (!ctxt) { pthread_mutex_unlock(&x->lock); ldms_xprt_put(x); return ENOMEM; } - + req = (struct ldms_request *)(ctxt + 1); len = format_dir_req(req, (uint64_t)(unsigned long)ctxt, flags); - ctxt->dir.cb = cb; - ctxt->dir.cb_arg = cb_arg; if (flags) x->local_dir_xid = (uint64_t)ctxt; pthread_mutex_unlock(&x->lock); @@ -2699,8 +2929,8 @@ int __ldms_remote_dir(ldms_t _x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags) x->log("DEBUG: remote_dir. get ref %p. active_dir = %d. xid %p\n", x->zap_ep, x->active_dir, (void *)req->hdr.xid); #endif /* DEBUG */ - int rc = zap_send(x->zap_ep, req, len); - if (rc) { + zap_err_t zerr = zap_send(x->zap_ep, req, len); + if (zerr) { zap_put_ep(x->zap_ep); pthread_mutex_lock(&x->lock); __ldms_free_ctxt(x, ctxt); @@ -2714,7 +2944,7 @@ int __ldms_remote_dir(ldms_t _x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags) pthread_mutex_unlock(&x->lock); } ldms_xprt_put(x); - return rc; + return zap_zerr2errno(zerr); } /* This request has no reply */ @@ -2731,12 +2961,13 @@ int __ldms_remote_dir_cancel(ldms_t _x) /* Prevent x being destroyed if DISCONNECTED is delivered in another thread */ ldms_xprt_get(x); pthread_mutex_lock(&x->lock); - if (alloc_req_ctxt(x, &req, &ctxt, LDMS_CONTEXT_DIR_CANCEL)) { + ctxt = __ldms_alloc_ctxt(x, sizeof(*req) + sizeof(*ctxt), LDMS_CONTEXT_DIR_CANCEL); + if (!ctxt) { pthread_mutex_unlock(&x->lock); ldms_xprt_put(x); return ENOMEM; } - + req = (struct ldms_request *)(ctxt + 1); len = format_dir_cancel_req(req); zap_get_ep(x->zap_ep); @@ -2745,8 +2976,8 @@ int __ldms_remote_dir_cancel(ldms_t _x) #endif /* DEBUG */ pthread_mutex_unlock(&x->lock); - int rc = zap_send(x->zap_ep, req, len); - if (rc) { + zap_err_t zerr = zap_send(x->zap_ep, req, len); + if (zerr) { #ifdef DEBUG pthread_mutex_lock(&x->lock); x->active_dir_cancel--; @@ -2759,7 +2990,7 @@ int __ldms_remote_dir_cancel(ldms_t _x) __ldms_free_ctxt(x, ctxt); pthread_mutex_unlock(&x->lock); ldms_xprt_put(x); - return rc; + return zap_zerr2errno(zerr); } int __ldms_remote_lookup(ldms_t _x, const char *path, @@ -2771,13 +3002,16 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, struct ldms_context *ctxt; struct ldms_rbuf_desc *rbd; size_t len; - int rc; + + if (!zap_ep_connected(x->zap_ep)) + return ENOTCONN; if (LDMS_XPRT_AUTH_GUARD(x)) return EPERM; __ldms_set_tree_lock(); struct ldms_set *set = __ldms_find_local_set(path); + __ldms_set_tree_unlock(); if (set) { rbd = ldms_lookup_rbd(x, set); if (rbd && !(flags & LDMS_LOOKUP_SET_INFO)) { @@ -2786,27 +3020,32 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, * from the host corresponding to * the transport. */ - __ldms_set_tree_unlock(); + ref_put(&set->ref, "__ldms_find_local_set"); return EEXIST; } + ref_put(&set->ref, "__ldms_find_local_set"); } - __ldms_set_tree_unlock(); + + char *lu_path = strdup(path); + if (!lu_path) + return ENOMEM; /* Prevent x being destroyed if DISCONNECTED is delivered in another thread */ ldms_xprt_get(x); pthread_mutex_lock(&x->lock); - if (alloc_req_ctxt(x, &req, &ctxt, LDMS_CONTEXT_LOOKUP)) { + ctxt = __ldms_alloc_ctxt(x, + sizeof(struct ldms_request) + sizeof(*ctxt), + LDMS_CONTEXT_LOOKUP_REQ, + cb, arg, lu_path, flags); + if (!ctxt) { + free(lu_path); pthread_mutex_unlock(&x->lock); ldms_xprt_put(x); return ENOMEM; } + req = (struct ldms_request *)(ctxt + 1); len = format_lookup_req(req, flags, path, (uint64_t)(unsigned long)ctxt); - ctxt->lookup.s = NULL; - ctxt->lookup.cb = cb; - ctxt->lookup.cb_arg = arg; - ctxt->lookup.flags = flags; - ctxt->lookup.path = strdup(path); #ifdef DEBUG x->active_lookup++; @@ -2819,24 +3058,22 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, x->log("DEBUG: remote_lookup: get ref %p: active_lookup = %d\n", x->zap_ep, x->active_lookup); #endif /* DEBUG */ - rc = zap_send(x->zap_ep, req, len); - if (rc) { + zap_err_t zerr = zap_send(x->zap_ep, req, len); + if (zerr) { zap_put_ep(x->zap_ep); - pthread_mutex_lock(&x->lock); __ldms_free_ctxt(x, ctxt); pthread_mutex_unlock(&x->lock); #ifdef DEBUG x->active_lookup--; - x->log("DEBUG: lookup_reply: error. put ref %p: " - "active_lookup = %d. path = %s\n", - x->zap_ep, x->active_lookup, - path); + x->log("DEBUG: lookup_reply: error %d. put ref %p: " + "active_lookup = %d. path = %s\n", + zerr, x->zap_ep, x->active_lookup, + path); #endif /* DEBUG */ - } ldms_xprt_put(x); - return rc; + return zap_zerr2errno(zerr); } static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, @@ -2846,15 +3083,18 @@ static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, struct ldms_request *req; struct ldms_context *ctxt; size_t len; - int rc; ldms_xprt_get(x); pthread_mutex_lock(&x->lock); - if (alloc_req_ctxt(x, &req, &ctxt, LDMS_CONTEXT_REQ_NOTIFY)) { + ctxt = __ldms_alloc_ctxt(x, sizeof(*req) + sizeof(*ctxt), + LDMS_CONTEXT_REQ_NOTIFY, + s, cb_fn, cb_arg); + if (!ctxt) { pthread_mutex_unlock(&x->lock); ldms_xprt_put(x); return ENOMEM; } + req = (struct ldms_request *)(ctxt + 1); if (s->local_notify_xid) { free((void *)(unsigned long)s->local_notify_xid); @@ -2862,19 +3102,17 @@ static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, } len = format_req_notify_req(req, (uint64_t)(unsigned long)ctxt, s->remote_set_id, flags); - ctxt->req_notify.cb = cb_fn; - ctxt->req_notify.arg = cb_arg; - ctxt->req_notify.s = s; s->local_notify_xid = (uint64_t)ctxt; pthread_mutex_unlock(&x->lock); - rc = zap_send(x->zap_ep, req, len); - if (rc) { - x->log("%s(): x %p: error %d sending REQ_NOTIFY\n", - __func__, x, rc); + zap_err_t zerr = zap_send(x->zap_ep, req, len); + if (zerr) { + x->zerrno = zerr; + x->log("%s(): x %p: error %s sending REQ_NOTIFY\n", + __func__, x, zap_err_str(zerr)); } ldms_xprt_put(x); - return rc; + return zap_zerr2errno(zerr); } int ldms_register_notify_cb(ldms_t x, ldms_set_t s, int flags, @@ -2888,6 +3126,83 @@ int ldms_register_notify_cb(ldms_t x, ldms_set_t s, int flags, return -1; } +/* + * Tell all peers that have an RBD for this set that it is being + * deleted. When they all reply, we can delete the set. + */ +void ldms_xprt_set_delete(ldms_set_t s, ldms_set_delete_cb_t cb_fn, void *cb_arg) +{ + struct ldms_request *req; + struct ldms_rbuf_desc *rbd, *next_rbd; + ldms_t xprt; + struct ldms_context *ctxt; + struct ldms_set *set = s->set; + size_t len; + + /* + * The lock order for lookup_re, etc... is x->lock, then + * set->lock. Create a local list of all of the local RBD, + * after removing them from the set. + */ + LIST_HEAD(rbuf_desc_list, ldms_rbuf_desc) rbd_list; + LIST_INIT(&rbd_list); + + pthread_mutex_lock(&set->lock); + rbd = LIST_FIRST(&set->local_rbd_list); + while (rbd) { + xprt = rbd->xprt; + next_rbd = LIST_NEXT(rbd, set_link); + LIST_REMOVE(rbd, set_link); + if (!xprt) + goto next; + + ref_get(&rbd->ref, "xprt_set_delete"); + LIST_INSERT_HEAD(&rbd_list, rbd, set_link); + + next: + ref_put(&rbd->ref, "set_rbd_list"); + rbd = next_rbd; + } + pthread_mutex_unlock(&set->lock); + + /* Release the set->lock, and walk the local list */ + rbd = LIST_FIRST(&rbd_list); + while (rbd) { + xprt = rbd->xprt; + next_rbd = LIST_NEXT(rbd, set_link); + LIST_REMOVE(rbd, set_link); + + pthread_mutex_lock(&xprt->lock); + + /* Make certain we didn't lose a disconnect race */ + if (rbd->xprt) { + /* Destroy the map so we don't accept remote read requests */ + __ldms_rbd_xprt_release(rbd); + } else { + pthread_mutex_unlock(&xprt->lock); + goto next_1; + } + + ctxt = __ldms_alloc_ctxt + (xprt, + sizeof(struct ldms_request) + sizeof(struct ldms_context), + LDMS_CONTEXT_SET_DELETE, + rbd, cb_fn, cb_arg); + req = (struct ldms_request *)(ctxt + 1); + len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, + ldms_set_instance_name_get(s)); + zap_err_t zerr = zap_send(xprt->zap_ep, req, len); + pthread_mutex_unlock(&xprt->lock); + if (zerr) { + xprt->zerrno = zerr; + __ldms_free_ctxt(xprt, ctxt); + } + next_1: + ref_put(&rbd->ref, "xprt_set_delete"); + rbd = next_rbd; + } +} + static int send_cancel_notify(ldms_t _x, ldms_set_t s) { struct ldms_xprt *x = _x; @@ -2899,7 +3214,11 @@ static int send_cancel_notify(ldms_t _x, ldms_set_t s) s->remote_set_id); s->local_notify_xid = 0; - return zap_send(x->zap_ep, &req, len); + zap_err_t zerr = zap_send(x->zap_ep, &req, len); + if (zerr) { + x->zerrno = zerr; + } + return zap_zerr2errno(zerr); } int ldms_cancel_notify(ldms_t t, ldms_set_t s) @@ -2919,7 +3238,7 @@ void ldms_notify(ldms_set_t s, ldms_notify_event_t e) return; if (!s->set) return; - pthread_mutex_lock(&xprt_list_lock); + pthread_mutex_lock(&s->set->lock); LIST_FOREACH(r, &s->set->local_rbd_list, set_link) { if (r->remote_notify_xid && (0 == r->notify_flags || (r->notify_flags & e->type))) { @@ -2928,7 +3247,7 @@ void ldms_notify(ldms_set_t s, ldms_notify_event_t e) e); } } - pthread_mutex_unlock(&xprt_list_lock); + pthread_mutex_unlock(&s->set->lock); } static int send_req_register_push(struct ldms_rbuf_desc *r, uint32_t push_change) @@ -2953,7 +3272,11 @@ static int send_req_register_push(struct ldms_rbuf_desc *r, uint32_t push_change req.push.flags = htonl(LDMS_RBD_F_PUSH); if (push_change) req.push.flags |= htonl(LDMS_RBD_F_PUSH_CHANGE); - rc = zap_share(x->zap_ep, r->lmap, (const char *)&req, len); + zap_err_t zerr = zap_share(x->zap_ep, r->lmap, (const char *)&req, len); + if (zerr) { + x->zerrno = zerr; + rc = zap_zerr2errno(zerr); + } out: ldms_xprt_put(x); return rc; @@ -2984,7 +3307,11 @@ static int send_req_cancel_push(struct ldms_rbuf_desc *r) req.hdr.cmd = htonl(LDMS_CMD_CANCEL_PUSH); req.hdr.len = htonl(len); req.cancel_push.set_id = r->remote_set_id; - rc = zap_send(x->zap_ep, &req, len); + zap_err_t zerr = zap_send(x->zap_ep, &req, len); + if (zerr) { + rc = zap_zerr2errno(zerr); + x->zerrno = zerr; + } ldms_xprt_put(x); return rc; } @@ -3005,38 +3332,20 @@ int __ldms_xprt_push(ldms_set_t s, int push_flags) uint32_t meta_meta_gn = __le32_to_cpu(set->meta->meta_gn); uint32_t meta_meta_sz = __le32_to_cpu(set->meta->meta_sz); uint32_t meta_data_sz = __le32_to_cpu(set->meta->data_sz); - struct ldms_rbuf_desc *rbd, *next_rbd; - - /* - * We have to lock all the transports since we are racing with - * disconnect and this function operates on more than one - * transport, i.e. the remote_rbd_list - */ - pthread_mutex_lock(&xprt_list_lock); + struct ldms_rbuf_desc *rbd; - /* Run through all RBD for the set and push to registered peers */ pthread_mutex_lock(&set->lock); rbd = LIST_FIRST(&set->remote_rbd_list); while (rbd) { + if ((rbd->xprt == NULL) || (0 == (rbd->push_flags & push_flags))) { + rbd = LIST_NEXT(rbd, set_link); + continue; + } rc = 0; - next_rbd = LIST_NEXT(rbd, set_link); ldms_t x = rbd->xprt; - if (!x) { -#ifdef PUSH_DEBUG - printf("DEBUG: Push set '%s'. Skipping RBD with no " - "transport %p. RBD type: %d\n", - ldms_set_instance_name_get(rbd), - rbd, rbd->type); -#endif /* PUSH_DEBUG */ - goto skip; - } pthread_mutex_lock(&x->lock); - if (LDMS_XPRT_AUTH_GUARD(x)) { - pthread_mutex_unlock(&x->lock); - goto skip; - } - if (!(rbd->push_flags & push_flags)) { + if (LDMS_XPRT_AUTH_GUARD(x)) { pthread_mutex_unlock(&x->lock); goto skip; } @@ -3066,7 +3375,6 @@ int __ldms_xprt_push(ldms_set_t s, int push_flags) ldms_set_instance_name_get(rbd), x->zap_ep); #endif /* PUSH_DEBUG */ - ldms_xprt_get(x); while (len) { size_t data_len; @@ -3094,23 +3402,15 @@ int __ldms_xprt_push(ldms_set_t s, int push_flags) doff += data_len; len -= data_len; } - pthread_mutex_unlock(&x->lock); - pthread_mutex_unlock(&set->lock); - pthread_mutex_unlock(&xprt_list_lock); - ldms_xprt_put(x); - pthread_mutex_lock(&xprt_list_lock); - pthread_mutex_lock(&set->lock); free(reply); skip: + pthread_mutex_unlock(&x->lock); #ifdef DEBUG x->active_push++; #endif /* DEBUG */ - if (rc) - break; - rbd = next_rbd; + rbd = LIST_NEXT(rbd, set_link); } pthread_mutex_unlock(&set->lock); - pthread_mutex_unlock(&xprt_list_lock); return rc; } @@ -3123,7 +3423,7 @@ int ldms_xprt_connect(ldms_t x, struct sockaddr *sa, socklen_t sa_len, ldms_event_cb_t cb, void *cb_arg) { int rc; - struct ldms_xprt *_x = x; + struct ldms_xprt *_x = x; struct ldms_conn_msg msg; __ldms_xprt_conn_msg_init(x, &msg); _x->event_cb = cb; @@ -3223,16 +3523,30 @@ int ldms_xprt_listen_by_name(ldms_t x, const char *host, const char *port_no, return rc; } +static void __destroy_rbd(void *v) +{ + struct ldms_rbuf_desc *rbd = v; + ref_put(&rbd->set->ref, "rbd_set"); + free(rbd); +} + +/* + * If 'x' is not NULL, must be called holding x->lock + * Cannot be called holding the set->lock + */ struct ldms_rbuf_desc * -__ldms_alloc_rbd(struct ldms_xprt *x, struct ldms_set *s, enum ldms_rbd_type type) +___ldms_alloc_rbd(struct ldms_xprt *x, struct ldms_set *s, enum ldms_rbd_type type, + const char *name, const char *func, int line) { zap_err_t zerr; struct ldms_rbuf_desc *rbd = calloc(1, sizeof(*rbd)); if (!rbd) goto err0; - rbd->xprt = x; + rbd->xprt = ldms_xprt_get(x); rbd->set = s; + ref_get(&s->ref, "rbd_set"); + _ref_init(&rbd->ref, name, __destroy_rbd, rbd, func, line); rbn_init(&rbd->xprt_rbn, s); size_t set_sz = __ldms_set_size_get(s); if (x) { @@ -3241,17 +3555,20 @@ __ldms_alloc_rbd(struct ldms_xprt *x, struct ldms_set *s, enum ldms_rbd_type typ if (zerr) goto err1; } - + ref_get(&s->ref, name); rbd->type = type; /* Add RBD to set list */ + ref_get(&rbd->ref, "set_rbd_list"); pthread_mutex_lock(&s->lock); if (type == LDMS_RBD_LOCAL) LIST_INSERT_HEAD(&s->local_rbd_list, rbd, set_link); else LIST_INSERT_HEAD(&s->remote_rbd_list, rbd, set_link); pthread_mutex_unlock(&s->lock); - if (x) + if (x) { rbt_ins(&x->rbd_rbt, &rbd->xprt_rbn); + ref_get(&rbd->ref, "xprt_rbd_tree"); + } goto out; @@ -3265,35 +3582,45 @@ __ldms_alloc_rbd(struct ldms_xprt *x, struct ldms_set *s, enum ldms_rbd_type typ void __ldms_rbd_xprt_release(struct ldms_rbuf_desc *rbd) { -#ifdef DEBUG + ldms_t xprt = rbd->xprt; if (rbd->lmap) { - rbd->xprt->log("DEBUG: zap %p: unmap local\n", rbd->xprt->zap_ep); - zap_unmap(rbd->xprt->zap_ep, rbd->lmap); +#ifdef DEBUG + xprt->log("DEBUG: zap %p: unmap local\n", rbd->xprt->zap_ep); +#endif + zap_unmap(xprt->zap_ep, rbd->lmap); + rbd->lmap = NULL; } - rbd->lmap = NULL; if (rbd->rmap) { - rbd->xprt->log("DEBUG: zap %p: unmap remote\n", rbd->xprt->zap_ep); +#ifdef DEBUG + xprt->log("DEBUG: zap %p: unmap remote\n", rbd->xprt->zap_ep); +#endif zap_unmap(rbd->xprt->zap_ep, rbd->rmap); + rbd->rmap = NULL; } - rbd->rmap = NULL; -#else - if (rbd->lmap) - zap_unmap(rbd->xprt->zap_ep, rbd->lmap); - rbd->lmap = NULL; - if (rbd->rmap) - zap_unmap(rbd->xprt->zap_ep, rbd->rmap); - rbd->rmap = NULL; -#endif /* DEBUG */ - rbt_del(&rbd->xprt->rbd_rbt, &rbd->xprt_rbn); + rbt_del(&xprt->rbd_rbt, &rbd->xprt_rbn); rbd->xprt = NULL; + ldms_xprt_put(xprt); + ref_put(&rbd->ref, "xprt_rbd_tree"); } -void __ldms_free_rbd(struct ldms_rbuf_desc *rbd) +/* + * Must be called holding rbd->set->lock + */ +void ___ldms_free_rbd(struct ldms_rbuf_desc *rbd, const char *name, const char *func, int line) { - if (rbd->xprt) + struct ldms_set *set = rbd->set; + struct ldms_xprt *x = rbd->xprt; + if (x) { + pthread_mutex_lock(&x->lock); __ldms_rbd_xprt_release(rbd); - LIST_REMOVE(rbd, set_link); - free(rbd); + pthread_mutex_unlock(&x->lock); + } + if (rbd->push_flags & LDMS_RBD_F_PUSH) { + _ref_put(&set->ref, "rendezvous_push", func, line); + _ref_put(&rbd->ref, "rendezvous_push", func, line); + } + _ref_put(&set->ref, name, func, line); + _ref_put(&rbd->ref, name, func, line); } static struct ldms_rbuf_desc *ldms_lookup_rbd(struct ldms_xprt *x, @@ -3316,6 +3643,25 @@ static struct ldms_rbuf_desc *ldms_lookup_rbd(struct ldms_xprt *x, return NULL; } +extern ldms_set_t ldms_xprt_set_by_name(ldms_t x, const char *set_name) +{ + struct ldms_set *set; + struct ldms_rbuf_desc *rbd; + + __ldms_set_tree_lock(); + set = __ldms_find_local_set(set_name); + __ldms_set_tree_unlock(); + if (!set) + return NULL; + pthread_mutex_lock(&x->lock); + rbd = ldms_lookup_rbd(x, set); + if (rbd) + ref_get(&rbd->ref, "ldms_xprt_set_by_name"); + pthread_mutex_unlock(&x->lock); + ref_put(&set->ref, "__ldms_find_local_set"); + return rbd; +} + void __ldms_xprt_term(struct ldms_xprt *x) { /* this is so that we call zap_close() exactly once */ @@ -3345,6 +3691,7 @@ static void __attribute__ ((constructor)) cs_init(void) { pthread_mutex_init(&xprt_list_lock, 0); pthread_mutex_init(&ldms_zap_list_lock, 0); + (void)clock_gettime(CLOCK_REALTIME, &xprt_start); } static void __attribute__ ((destructor)) cs_term(void) diff --git a/ldms/src/core/ldms_xprt.h b/ldms/src/core/ldms_xprt.h index 27759b87c..3dd5d4952 100644 --- a/ldms/src/core/ldms_xprt.h +++ b/ldms/src/core/ldms_xprt.h @@ -50,6 +50,7 @@ #ifndef __LDMS_XPRT_H__ #define __LDMS_XPRT_H__ +#include #include #include #include @@ -59,9 +60,36 @@ #include "ldms.h" #include "ldms_auth.h" +#include "ref.h" #pragma pack(4) +/* + * Callback function invoked when peer acknowledges a deleted set + * + * xprt The transport handle + * status 0 on success, errno if there was a problem sending the request + * set The set handle + * cb_arg The cb_arg parameter to the ldsm_xprt_set_delete + * function + */ +typedef void (*ldms_set_delete_cb_t)(ldms_t xprt, int status, ldms_set_t set, void *cb_arg); + +/* + * Notify a remote peers that this set is being deleted + * + * A remote peer is a client that has received a copy of this set via ldms_xprt_lookup. + * + * set The set handle + * cb_fn Pointer to the ldms_del_rem_set_cb function that will + * be called when the peer acknolwedges that receiept of + * this set delete request. + * cb_arg void * argument to pass to the callback function + */ +extern void ldms_xprt_set_delete(ldms_set_t set, + ldms_set_delete_cb_t cb_fn, + void *cb_arg); + /** * If set in the push_flags, the set changes will be automatically * pushed by ldms_transaction_end() @@ -71,6 +99,7 @@ #define LDMS_RBD_F_PUSH_CANCEL 4 /* cancel pending */ struct ldms_rbuf_desc { + struct ref_s ref; struct ldms_xprt *xprt; struct ldms_set *set; uint64_t remote_set_id; /* Remote set id returned by lookup */ @@ -125,6 +154,7 @@ enum ldms_request_cmd { LDMS_CMD_AUTH_MSG, LDMS_CMD_CANCEL_PUSH, LDMS_CMD_AUTH, + LDMS_CMD_SET_DELETE, LDMS_CMD_REPLY = 0x100, LDMS_CMD_DIR_REPLY, LDMS_CMD_DIR_CANCEL_REPLY, @@ -135,6 +165,7 @@ enum ldms_request_cmd { LDMS_CMD_AUTH_APPROVAL_REPLY, LDMS_CMD_PUSH_REPLY, LDMS_CMD_AUTH_REPLY, + LDMS_CMD_SET_DELETE_REPLY, /* Transport private requests set bit 32 */ LDMS_CMD_XPRT_PRIVATE = 0x80000000, }; @@ -165,6 +196,14 @@ struct ldms_dir_cmd_param { uint32_t flags; /*! Directory update flags */ }; +struct ldms_set_delete_cmd_param { +#ifdef SWIG +%immutable; +#endif + uint32_t inst_name_len; + char inst_name[OVIS_FLEX]; +}; + struct ldms_req_notify_cmd_param { uint64_t set_id; /*! The set we want notifications for */ uint32_t flags; /*! Events we want */ @@ -189,6 +228,7 @@ struct ldms_request { union { struct ldms_send_cmd_param send; struct ldms_dir_cmd_param dir; + struct ldms_set_delete_cmd_param set_delete; struct ldms_lookup_cmd_param lookup; struct ldms_req_notify_cmd_param req_notify; struct ldms_cancel_notify_cmd_param cancel_notify; @@ -283,18 +323,21 @@ struct ldms_reply { typedef enum ldms_context_type { LDMS_CONTEXT_DIR, LDMS_CONTEXT_DIR_CANCEL, - LDMS_CONTEXT_LOOKUP, + LDMS_CONTEXT_LOOKUP_REQ, + LDMS_CONTEXT_LOOKUP_READ, LDMS_CONTEXT_UPDATE, LDMS_CONTEXT_REQ_NOTIFY, LDMS_CONTEXT_SEND, LDMS_CONTEXT_PUSH, LDMS_CONTEXT_UPDATE_META, + LDMS_CONTEXT_SET_DELETE, } ldms_context_type_t; struct ldms_context { sem_t sem; sem_t *sem_p; int rc; + struct ldms_xprt *x; ldms_context_type_t type; union { struct { @@ -302,27 +345,37 @@ struct ldms_context { void *cb_arg; } dir; struct { + ldms_lookup_cb_t cb; + void *cb_arg; char *path; + enum ldms_lookup_flags flags; + } lu_req; + struct { + ldms_set_t s; ldms_lookup_cb_t cb; void *cb_arg; int more; enum ldms_lookup_flags flags; - ldms_set_t s; - zap_map_t remote_map; - } lookup; + } lu_read; struct { ldms_set_t s; ldms_update_cb_t cb; - void *arg; + void *cb_arg; int idx_from; int idx_to; } update; struct { ldms_set_t s; ldms_notify_cb_t cb; - void *arg; + void *cb_arg; } req_notify; + struct { + ldms_set_t s; + ldms_set_delete_cb_t cb; + void *cb_arg; + } set_delete; }; + struct timespec start; TAILQ_ENTRY(ldms_context) link; }; @@ -332,6 +385,9 @@ struct ldms_xprt { char name[LDMS_MAX_TRANSPORT_NAME_LEN]; uint32_t ref_count; pthread_mutex_t lock; + int disconnected; + zap_err_t zerrno; + struct ldms_xprt_stats stats; /* Semaphore and return code for synchronous xprt calls */ sem_t sem; diff --git a/ldms/src/core/ref.h b/ldms/src/core/ref.h index 4785386db..4baf953e1 100644 --- a/ldms/src/core/ref.h +++ b/ldms/src/core/ref.h @@ -132,24 +132,24 @@ static inline void _ref_init(ref_t r, const char *name, * suppress the `-Werror=unused-function` for this function. */ __attribute__((unused)) -static void ref_dump_no_lock(ref_t r, const char *name) +static void ref_dump_no_lock(ref_t r, const char *name, FILE *f) { #ifdef _REF_TRACK_ ref_inst_t inst; - fprintf(stderr, "... %s: ref %p free_fn %p free_arg %p ...\n", + fprintf(f, "... %s: ref %p free_fn %p free_arg %p ...\n", name, r, r->free_fn, r->free_arg); - fprintf(stderr, + fprintf(f, "%-16s %-8s %-32s %-32s\n", "Name", "Count", "Get Loc", "Put Loc"); fprintf(stderr, "---------------- -------- -------------------------------- " "--------------------------------\n"); LIST_FOREACH(inst, &r->head, entry) { - fprintf(stderr, + fprintf(f, "%-16s %8d %-23s/%8d %-23s/%8d\n", inst->name, inst->ref_count, inst->get_func, inst->get_line, inst->put_func, inst->put_line); } - fprintf(stderr, "%16s %8d\n", "Total", r->ref_count); + fprintf(f, "%16s %8d\n", "Total", r->ref_count); #endif } @@ -158,11 +158,11 @@ static void ref_dump_no_lock(ref_t r, const char *name) * suppress the `-Werror=unused-function` for this function. */ __attribute__((unused)) -static void ref_dump(ref_t r, const char *name) +static void ref_dump(ref_t r, const char *name, FILE *f) { #ifdef _REF_TRACK_ pthread_mutex_lock(&r->lock); - ref_dump_no_lock(r, name); + ref_dump_no_lock(r, name, f); pthread_mutex_unlock(&r->lock); #endif } diff --git a/ldms/src/ldmsd/ldmsd.c b/ldms/src/ldmsd/ldmsd.c index e3db1f5ab..f75d6032a 100644 --- a/ldms/src/ldmsd/ldmsd.c +++ b/ldms/src/ldmsd/ldmsd.c @@ -1169,7 +1169,7 @@ ldmsd_set_info_t ldmsd_set_info_get(const char *inst_name) } ldmsd_cfg_unlock(LDMSD_CFGOBJ_PRDCR); out: - + ldms_set_put(lset); return info; } diff --git a/ldms/src/ldmsd/ldmsd.h b/ldms/src/ldmsd/ldmsd.h index bcbb3e07e..1f9cf2833 100644 --- a/ldms/src/ldmsd/ldmsd.h +++ b/ldms/src/ldmsd/ldmsd.h @@ -808,6 +808,7 @@ extern ldmsctl_cmd_fn_t cmd_table[LDMSCTL_LAST_COMMAND + 1]; * Max length of error strings while ldmsd is being configured. */ #define LEN_ERRSTR 256 +#define LDMSD_ENOMEM_MSG "Memory allocation failure\n" void ldmsd_msg_logger(enum ldmsd_loglevel level, const char *fmt, ...); int ldmsd_logrotate(); diff --git a/ldms/src/ldmsd/ldmsd_cfgobj.c b/ldms/src/ldmsd/ldmsd_cfgobj.c index 991baf0f3..2fa16c793 100644 --- a/ldms/src/ldmsd/ldmsd_cfgobj.c +++ b/ldms/src/ldmsd/ldmsd_cfgobj.c @@ -50,6 +50,7 @@ #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif +#include #include #include #include @@ -198,8 +199,12 @@ ldmsd_cfgobj_t ldmsd_cfgobj_new(const char *name, ldmsd_cfgobj_type_t type, ldmsd_cfgobj_t ldmsd_cfgobj_get(ldmsd_cfgobj_t obj) { - if (obj) - __sync_fetch_and_add(&obj->ref_count, 1); + uint32_t ref_count; + if (obj) { + ref_count = __sync_fetch_and_add(&obj->ref_count, 1); + assert(ref_count >= 1); + } + return obj; } diff --git a/ldms/src/ldmsd/ldmsd_config.c b/ldms/src/ldmsd/ldmsd_config.c index 9b363db99..f20d87bae 100644 --- a/ldms/src/ldmsd/ldmsd_config.c +++ b/ldms/src/ldmsd/ldmsd_config.c @@ -985,8 +985,7 @@ int listen_on_ldms_xprt(ldmsd_listen_t listen) void ldmsd_cfg_ldms_init(ldmsd_cfg_xprt_t xprt, ldms_t ldms) { - ldms_xprt_get(ldms); - xprt->ldms.ldms = ldms; + xprt->ldms.ldms = ldms_xprt_get(ldms); xprt->send_fn = send_ldms_fn; xprt->max_msg = ldms_xprt_msg_max(ldms); xprt->cleanup_fn = ldmsd_cfg_ldms_xprt_cleanup; diff --git a/ldms/src/ldmsd/ldmsd_prdcr.c b/ldms/src/ldmsd/ldmsd_prdcr.c index df255a2a3..9643252c5 100644 --- a/ldms/src/ldmsd/ldmsd_prdcr.c +++ b/ldms/src/ldmsd/ldmsd_prdcr.c @@ -133,6 +133,7 @@ void __prdcr_set_del(ldmsd_prdcr_set_t set) free(set->schema_name); if (set->set) { + ref_put(&set->set->ref, "prdcr_set"); ldms_set_unpublish(set->set); ldms_set_delete(set->set); } @@ -201,15 +202,7 @@ void prdcr_hint_tree_update(ldmsd_prdcr_t prdcr, ldmsd_prdcr_set_t prd_set, } else if (op == UPDT_HINT_TREE_ADD) { if (!rbn) { list = malloc(sizeof(*list)); - if (!list) { - ldmsd_log(LDMSD_LERROR, "%s: out of memory.\n", __FUNCTION__); - return; - } hint_key = malloc(sizeof(*hint_key)); - if (!hint_key) { - ldmsd_log(LDMSD_LERROR, "%s: out of memory.\n", __FUNCTION__); - return; - } *hint_key = *hint; rbn_init(&list->rbn, hint_key); rbt_ins(&prdcr->hint_set_tree, &list->rbn); @@ -338,6 +331,8 @@ static void __update_set_info(ldmsd_prdcr_set_t set, ldms_dir_set_t dset) } } +extern void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, + int more, ldms_set_t set, void *arg); static void _add_cb(ldms_t xprt, ldmsd_prdcr_t prdcr, ldms_dir_set_t dset) { ldmsd_prdcr_set_t set; @@ -356,9 +351,19 @@ static void _add_cb(ldms_t xprt, ldmsd_prdcr_t prdcr, ldms_dir_set_t dset) } set->prdcr = prdcr; rbt_ins(&prdcr->set_tree, &set->rbn); + rbt_verify(&prdcr->set_tree); + /* See if the ldms set is already there */ + ldms_set_t xs = ldms_xprt_set_by_name(xprt, dset->inst_name); + if (xs) { + assert(0 == "this should not happen"); + ldmsd_prdcr_set_ref_get(set); /* dropped in prdset_lookup_cb() */ + __ldmsd_prdset_lookup_cb(xprt, 0, 0, xs, set); + ref_put(&xs->ref, "ldms_xprt_set_by_name"); + } } else { - ldmsd_log(LDMSD_LCRITICAL, "Receive a duplicated dir_add update of " - "the set '%s'.\n", dset->inst_name); + ldmsd_log(LDMSD_LCRITICAL, "Received a dir_add update for " + "'%s', prdcr_set still present with refcount %d, and set " + "%p.\n", dset->inst_name, set->ref_count, set->set); return; } @@ -506,10 +511,52 @@ static int __prdcr_subscribe(ldmsd_prdcr_t prdcr) return rc; } +static void __prdcr_remote_set_delete(ldmsd_prdcr_t prdcr, ldms_set_t set) +{ + ldmsd_prdcr_set_t prdcr_set; + if (!set) + return; + prdcr_set = ldmsd_prdcr_set_find(prdcr, ldms_set_instance_name_get(set)); + pthread_mutex_lock(&prdcr_set->lock); + assert(prdcr_set->ref_count); + switch (prdcr_set->state) { + case LDMSD_PRDCR_SET_STATE_START: + ldmsd_log(LDMSD_LERROR, + "Deleting %s in the START state\n", + prdcr_set->inst_name); + break; + case LDMSD_PRDCR_SET_STATE_LOOKUP: + ldmsd_log(LDMSD_LERROR, + "Deleting %s in the LOOKUP state\n", + prdcr_set->inst_name); + break; + case LDMSD_PRDCR_SET_STATE_READY: + ldmsd_log(LDMSD_LERROR, + "Deleting %s in the READY state\n", + prdcr_set->inst_name); + break; + case LDMSD_PRDCR_SET_STATE_UPDATING: + ldmsd_log(LDMSD_LERROR, + "Deleting %s in the UPDATING state\n", + prdcr_set->inst_name); + break; + } + pthread_mutex_unlock(&prdcr_set->lock); + prdcr_reset_set(prdcr, prdcr_set); +} + static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) { ldmsd_prdcr_t prdcr = cb_arg; ldmsd_prdcr_lock(prdcr); + switch(e->type) { + case LDMS_XPRT_EVENT_DISCONNECTED: + x->disconnected = 1; + break; + default: + assert(x->disconnected == 0); + break; + } switch (e->type) { case LDMS_XPRT_EVENT_CONNECTED: ldmsd_log(LDMSD_LINFO, "Producer %s is connected (%s %s:%d)\n", @@ -517,7 +564,8 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) prdcr->host_name, (int)prdcr->port_no); prdcr->conn_state = LDMSD_PRDCR_STATE_CONNECTED; if (__prdcr_subscribe(prdcr)) { - ldmsd_log(LDMSD_LERROR, "Could not subscribe to stream data on producer %s\n", + ldmsd_log(LDMSD_LERROR, + "Could not subscribe to stream data on producer %s\n", prdcr->obj.name); } if (ldms_xprt_dir(prdcr->xprt, prdcr_dir_cb, prdcr, @@ -525,6 +573,12 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) ldms_xprt_close(prdcr->xprt); ldmsd_task_stop(&prdcr->task); break; + case LDMS_XPRT_EVENT_RECV: + ldmsd_recv_msg(x, e->data, e->data_len); + break; + case LDMS_XPRT_EVENT_SET_DELETE: + __prdcr_remote_set_delete(prdcr, e->set_delete.set); + break; case LDMS_XPRT_EVENT_REJECTED: ldmsd_log(LDMSD_LERROR, "Producer %s rejected the " "connection (%s %s:%d)\n", prdcr->obj.name, @@ -541,9 +595,6 @@ static void prdcr_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) prdcr->obj.name, prdcr->xprt_name, prdcr->host_name, (int)prdcr->port_no); goto reset_prdcr; - case LDMS_XPRT_EVENT_RECV: - ldmsd_recv_msg(x, e->data, e->data_len); - break; default: assert(0); } diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index c33fef3da..7468ceb49 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -59,6 +59,7 @@ #include #include #include +#include #include "ldms.h" #include "ldmsd.h" #include "ldmsd_request.h" @@ -194,6 +195,7 @@ static int logrotate_handler(ldmsd_req_ctxt_t req_ctxt); static int exit_daemon_handler(ldmsd_req_ctxt_t req_ctxt); static int greeting_handler(ldmsd_req_ctxt_t req_ctxt); static int set_route_handler(ldmsd_req_ctxt_t req_ctxt); +static int xprt_stats_handler(ldmsd_req_ctxt_t req_ctxt); static int unimplemented_handler(ldmsd_req_ctxt_t req_ctxt); static int eperm_handler(ldmsd_req_ctxt_t req_ctxt); static int ebusy_handler(ldmsd_req_ctxt_t reqc); @@ -408,6 +410,11 @@ static struct request_handler_entry request_handler[] = { LDMSD_SET_ROUTE_REQ, set_route_handler, XUG }, + /* Transport Stats Request */ + [LDMSD_XPRT_STATS_REQ] = { + LDMSD_XPRT_STATS_REQ, xprt_stats_handler, XUG + }, + /* FAILOVER user commands */ [LDMSD_FAILOVER_CONFIG_REQ] = { LDMSD_FAILOVER_CONFIG_REQ, failover_config_handler, XUG, @@ -600,7 +607,6 @@ void req_ctxt_tree_unlock() static void free_cfg_xprt_ldms(ldmsd_cfg_xprt_t xprt) { ldms_xprt_put(xprt->ldms.ldms); - xprt->ldms.ldms = NULL; free(xprt); } @@ -669,10 +675,10 @@ void free_req_cmd_ctxt(ldmsd_req_cmd_t rcmd) { if (rcmd->org_reqc) req_ctxt_ref_put(rcmd->org_reqc); + if (rcmd->reqc && rcmd->reqc->xprt->cleanup_fn) + rcmd->reqc->xprt->cleanup_fn(rcmd->reqc->xprt); if (rcmd->reqc) req_ctxt_ref_put(rcmd->reqc); - if (rcmd->reqc->xprt->cleanup_fn) - rcmd->reqc->xprt->cleanup_fn(rcmd->reqc->xprt); free(rcmd); } @@ -5359,6 +5365,268 @@ static int set_route_handler(ldmsd_req_ctxt_t reqc) out: return rc; } +/* + * { + * "compute_time_us" : , + * "connect_rate_s" : , + * "connect_request_rate_s" : , + * "disconnect_rate_s" : , + * "reject_rate_s" : , + * "auth_fail_rate_s" : , + * "xprt_count" : , + * "open_count" : , + * "close_count" : , + * "lookup_req" : { + * "count" : + * "total_us" : + * "min_us" : + * "max_us" : + * "mean_us" : + * "max_xprt" : { "host" : , "xprt" : , "port" : }, + * "min_xprt" : { "host" : , "xprt" : , "port" : } + * }, + * "lookup_resp" : { + * "count" : + * "total_us" : + * "min_us" : + * "max_us" : + * "mean_us" : + * "max_xprt" : { "host" : , "xprt" : , "port" : }, + * "min_xprt" : { "host" : , "xprt" : , "port" : } + * }, + * "update_req" : { + * "count" : + * "total_us" : + * "min_us" : + * "max_us" : + * "mean_us" : + * "max_xprt" : { "host" : , "xprt" : , "port" : }, + * "min_xprt" : { "host" : , "xprt" : , "port" : } + * } + * "update_resp" : { + * "count" : + * "total_us" : + * "min_us" : + * "max_us" : + * "mean_us" : + * "max_xprt" : { "host" : , "xprt" : , "port" : }, + * "min_xprt" : { "host" : , "xprt" : , "port" : } + * } + * } + */ +struct op_summary { + uint64_t op_count; + uint64_t op_total_us; + uint64_t op_min_us; + struct ldms_xprt *op_min_xprt; + uint64_t op_max_us; + struct ldms_xprt *op_max_xprt; + uint64_t op_mean_us; +}; + +static int xprt_stats_handler(ldmsd_req_ctxt_t req) +{ + #define XPRT_BUFLEN 4096 + char *buff, *s; + size_t sz = XPRT_BUFLEN; + int rc, len; + uint32_t term; + const char *errmsg = NULL; + ldmsd_req_attr_t attr; + struct ldms_xprt *x; + struct ldms_xprt_stats xs; + struct op_summary op_sum[LDMS_XPRT_OP_COUNT]; + enum ldms_xprt_ops_e op_e; + int xprt_count = 0; + int xprt_connect_count = 0; + int xprt_connecting_count = 0; + int xprt_listen_count = 0; + int xprt_close_count = 0; + struct timespec start, end; + struct sockaddr_storage ss_local, ss_remote; + struct sockaddr_in *sin; + socklen_t socklen; + zap_err_t zerr; + char ip_str[32]; + char xprt_type[16]; + struct ldms_xprt_rate_data rate_data; + + ldms_xprt_rate_data(&rate_data); + + buff = malloc(sz); + if (!buff) { + rc = ENOMEM; + errmsg = LDMSD_ENOMEM_MSG; + goto err; + } + attr = (ldmsd_req_attr_t)buff; + attr->discrim = 1; + attr->attr_id = LDMSD_ATTR_JSON; + /* len will be assigned after the str is populated */ + s = buff + sizeof(*attr); + sz -= sizeof(*attr); + + #define __APPEND(...) do { \ + len = snprintf(s, sz, __VA_ARGS__); \ + if (len >= sz) { \ + uint64_t off = (uint64_t)s - (uint64_t)buff; \ + sz += XPRT_BUFLEN; \ + s = realloc(buff, sz); \ + if (!s) { \ + rc = ENOMEM; \ + errmsg = LDMSD_ENOMEM_MSG; \ + goto err; \ + } \ + buff = s; \ + s = &buff[off]; \ + continue; \ + } \ + s += len; \ + sz -= len; \ + break; \ + } while(1) + + memset(op_sum, 0, sizeof(op_sum)); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) + op_sum[op_e].op_min_us = LLONG_MAX; + + /* Compute summary statistics across all of the transports */ + (void)clock_gettime(CLOCK_REALTIME, &start); + for (x = ldms_xprt_first(); x; x = ldms_xprt_next(x)) { + ldms_stats_entry_t op; + + ldms_xprt_stats(x, &xs); + xprt_count += 1; + zap_ep_state_t ep_state = zap_ep_state(x->zap_ep); + switch (ep_state) { + case ZAP_EP_LISTENING: + xprt_listen_count += 1; + break; + case ZAP_EP_ACCEPTING: + case ZAP_EP_CONNECTING: + xprt_connecting_count += 1; + break; + case ZAP_EP_CONNECTED: + xprt_connect_count += 1; + break; + case ZAP_EP_INIT: + case ZAP_EP_PEER_CLOSE: + case ZAP_EP_CLOSE: + case ZAP_EP_ERROR: + xprt_close_count += 1; + } + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + op = &xs.ops[op_e]; + if (!op->count) + continue; + op_sum[op_e].op_total_us += op->total_us; + op_sum[op_e].op_count += op->count; + if (op->min_us < op_sum[op_e].op_min_us) { + op_sum[op_e].op_min_us = op->min_us; + op_sum[op_e].op_min_xprt = ldms_xprt_get(x); + } + if (op->max_us > op_sum[op_e].op_max_us) { + op_sum[op_e].op_max_us = op->max_us; + op_sum[op_e].op_max_xprt = ldms_xprt_get(x); + } + } + assert(x->ref_count > 1); + ldms_xprt_put(x); + } + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + if (op_sum[op_e].op_count) { + op_sum[op_e].op_mean_us = + op_sum[op_e].op_total_us / op_sum[op_e].op_count; + } + } + + (void)clock_gettime(CLOCK_REALTIME, &end); + uint64_t compute_time = ldms_timespec_diff_us(&start, &end); + + __APPEND("{"); + __APPEND(" \"compute_time_us\": %ld,\n", compute_time); + __APPEND(" \"connect_rate_s\": %f,\n", rate_data.connect_rate_s); + __APPEND(" \"connect_request_rate_s\": %f,\n", rate_data.connect_request_rate_s); + __APPEND(" \"disconnect_rate_s\": %f,\n", rate_data.disconnect_rate_s); + __APPEND(" \"reject_rate_s\": %f,\n", rate_data.reject_rate_s); + __APPEND(" \"auth_fail_rate_s\": %f,\n", rate_data.auth_fail_rate_s); + __APPEND(" \"xprt_count\": %d,\n", xprt_count); + __APPEND(" \"connect_count\": %d,\n", xprt_connect_count); + __APPEND(" \"connecting_count\": %d,\n", xprt_connecting_count); + __APPEND(" \"listen_count\": %d,\n", xprt_listen_count); + __APPEND(" \"close_count\": %d,\n", xprt_close_count); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + struct op_summary *op; + op = &op_sum[op_e]; + __APPEND(" \"%s\" : {\n", ldms_xprt_op_names[op_e]); + __APPEND(" \"count\": %ld,\n", op->op_count); + __APPEND(" \"total_us\": %ld,\n", op->op_total_us); + __APPEND(" \"min_us\": %ld,\n", (op->op_count ? op->op_min_us: 0)); + sin = (struct sockaddr_in *)&ss_remote; + memset(&ss_remote, 0, sizeof(ss_remote)); + strncpy(ip_str, "0.0.0.0:0", sizeof(ip_str)); + strncpy(xprt_type, "????", sizeof(xprt_type)); + if (op->op_min_xprt && op->op_min_xprt->zap_ep) { + zerr = zap_get_name(op->op_min_xprt->zap_ep, + (struct sockaddr *)&ss_local, + (struct sockaddr *)&ss_remote, + &socklen); + strncpy(xprt_type, ldms_xprt_type_name(op->op_min_xprt), + sizeof(xprt_type)); + inet_ntop(sin->sin_family, &sin->sin_addr, ip_str, sizeof(ip_str)); + } + if (op->op_min_xprt) + ldms_xprt_put(op->op_min_xprt); + __APPEND(" \"min_peer\": \"%s:%hu\"\n,", ip_str, ntohs(sin->sin_port)); + __APPEND(" \"min_peer_type\": \"%s\"\n,", xprt_type); + + __APPEND(" \"max_us\": %ld,\n", (op->op_count ? op->op_max_us : 0)); + memset(&ss_remote, 0, sizeof(ss_remote)); + + if (op->op_max_xprt && op->op_max_xprt->zap_ep) { + zerr = zap_get_name(op->op_max_xprt->zap_ep, + (struct sockaddr *)&ss_local, + (struct sockaddr *)&ss_remote, + &socklen); + strncpy(xprt_type, ldms_xprt_type_name(op->op_max_xprt), + sizeof(xprt_type)); + inet_ntop(sin->sin_family, &sin->sin_addr, ip_str, sizeof(ip_str)); + } + if (op->op_max_xprt) + ldms_xprt_put(op->op_max_xprt); + __APPEND(" \"max_peer\": \"%s:%hu\"\n,", ip_str, ntohs(sin->sin_port)); + __APPEND(" \"max_peer_type\": \"%s\"\n,", xprt_type); + __APPEND(" \"mean_us\": %ld\n", op->op_mean_us); + if (op_e < LDMS_XPRT_OP_COUNT - 1) + __APPEND(" },\n"); + else + __APPEND(" }\n"); + } + __APPEND("}"); + sz = s - buff + 1; + attr->attr_len = sz - sizeof(*attr); + ldmsd_hton_req_attr(attr); + rc = ldmsd_append_reply(req, buff, sz, LDMSD_REQ_SOM_F); + if (rc) { + errmsg = "append reply error"; + goto err; + } + term = 0; + rc = ldmsd_append_reply(req, (void*)&term, sizeof(term), + LDMSD_REQ_EOM_F); + if (rc) { + errmsg = "append reply error"; + goto err; + } + free(buff); + return 0; +err: + if (buff) + free(buff); + req->errcode = rc; + ldmsd_send_req_response(req, errmsg); + return rc; +} static int stream_publish_handler(ldmsd_req_ctxt_t reqc) { diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 804dee7be..d0f1449a3 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -115,6 +115,7 @@ enum ldmsd_request { LDMSD_EXIT_DAEMON_REQ, LDMSD_RECORD_LEN_ADVICE_REQ, LDMSD_SET_ROUTE_REQ, + LDMSD_XPRT_STATS_REQ, /* command-line options */ LDMSD_LISTEN_REQ, diff --git a/ldms/src/ldmsd/ldmsd_updtr.c b/ldms/src/ldmsd/ldmsd_updtr.c index 5f9c4c42a..ccb0c998c 100644 --- a/ldms/src/ldmsd/ldmsd_updtr.c +++ b/ldms/src/ldmsd/ldmsd_updtr.c @@ -330,8 +330,8 @@ __grp_iter_cb(ldms_set_t grp, const char *name, void *arg) return 0; } -static void prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, - int more, ldms_set_t set, void *arg); +void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, + int more, ldms_set_t set, void *arg); static int schedule_set_updates(ldmsd_prdcr_set_t prd_set, ldmsd_updtr_task_t task) { int rc = 0; @@ -380,9 +380,9 @@ static int schedule_set_updates(ldmsd_prdcr_set_t prd_set, ldmsd_updtr_task_t ta * Thus, do the lookup here. */ rc = ldms_xprt_lookup(pset->prdcr->xprt, - pset->inst_name, - LDMS_LOOKUP_BY_INSTANCE, - prdset_lookup_cb, pset); + pset->inst_name, + LDMS_LOOKUP_BY_INSTANCE, + __ldmsd_prdset_lookup_cb, pset); if (rc) goto out; } @@ -402,11 +402,6 @@ static int schedule_set_updates(ldmsd_prdcr_set_t prd_set, ldmsd_updtr_task_t ta } } else if (0 == (prd_set->push_flags & LDMSD_PRDCR_SET_F_PUSH_REG)) { op_s = "Registering push for"; - // this doesn't work because it's taken after lookup - // which may fail or the updater is never started at - // all. since the flags don't tell yuou one way or the - // other this is just broken - // ldmsd_prdcr_set_ref_get(prd_set); if (updtr->push_flags & LDMSD_UPDTR_F_PUSH_CHANGE) push_flags = LDMS_XPRT_PUSH_F_CHANGE; rc = ldms_xprt_register_push(prd_set->set, push_flags, @@ -509,8 +504,9 @@ static int __setgrp_members_lookup(ldmsd_prdcr_set_t setgrp) rc = ldms_xprt_lookup(setgrp->prdcr->xprt, pset->inst_name, LDMS_LOOKUP_BY_INSTANCE, - prdset_lookup_cb, pset); + __ldmsd_prdset_lookup_cb, pset); if (rc) { + pset->state = LDMSD_PRDCR_SET_STATE_START; ldmsd_log(LDMSD_LINFO, "Synchronous error %d " "from ldms_lookup\n", rc); @@ -530,14 +526,15 @@ static int __setgrp_members_lookup(ldmsd_prdcr_set_t setgrp) return rc; } -static void prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, - int more, ldms_set_t set, void *arg) +void __ldmsd_prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, + int more, ldms_set_t set, void *arg) { ldmsd_prdcr_set_t prd_set = arg; int ready = 0; int flags; pthread_mutex_lock(&prd_set->lock); if (status != LDMS_LOOKUP_OK) { + assert(NULL == set); status = (status < 0 ? -status : status); if (status == ENOMEM) { ldmsd_log(LDMSD_LERROR, @@ -558,7 +555,10 @@ static void prdset_lookup_cb(ldms_t xprt, enum ldms_lookup_status status, } if (!prd_set->set) { /* This is the first lookup of the set. */ + ref_get(&set->ref, "prdcr_set"); prd_set->set = set; + } else { + assert(0 == "multiple lookup on the same prdcr_set"); } flags = ldmsd_group_check(prd_set->set); if (flags & LDMSD_GROUP_IS_GROUP) { @@ -623,10 +623,14 @@ static void schedule_prdcr_updates(ldmsd_updtr_task_t task, ldmsd_prdcr_set_ref_get(prd_set); /* It will be put back in lookup_cb */ /* Lookup the set */ prd_set->state = LDMSD_PRDCR_SET_STATE_LOOKUP; + assert(prd_set->set == NULL); rc = ldms_xprt_lookup(prdcr->xprt, prd_set->inst_name, LDMS_LOOKUP_BY_INSTANCE, - prdset_lookup_cb, prd_set); + __ldmsd_prdset_lookup_cb, prd_set); if (rc) { + /* If the error is EEXIST, the set is already in the set tree. */ + assert(rc != EEXIST); + prd_set->state = LDMSD_PRDCR_SET_STATE_START; ldmsd_log(LDMSD_LINFO, "Synchronous error %d from ldms_lookup\n", rc); ldmsd_prdcr_set_ref_put(prd_set); } @@ -679,7 +683,7 @@ static void cancel_prdcr_updates(ldmsd_updtr_t updtr, cancel_set_updates(prd_set, updtr); continue; } - + rc = 1; if (match->selector == LDMSD_NAME_MATCH_INST_NAME) str = prd_set->inst_name; else