From fbeee6b12268b9fea06bdae93463eec5deabc68f Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Sun, 29 Sep 2024 20:42:17 +0300 Subject: [PATCH] Misc mqtt related code cleanup (#18622) * Remove pthread locks / use spinlocks * Remove redundant checks as mallocz, callocz can't return NULL * Change logging More code cleanup * Change random number generation Set Origin to empty Use BCryptGenRandom * More cleanup Deduplicate base64_encode/decode * Address review comments --- CMakeLists.txt | 4 +- src/aclk/aclk.c | 40 +- src/aclk/aclk_otp.c | 42 +- src/aclk/aclk_tx_msgs.c | 2 +- src/aclk/aclk_util.c | 43 +- src/aclk/aclk_util.h | 11 +- src/aclk/https_client.c | 3 +- src/aclk/mqtt_websockets/mqtt_ng.c | 491 +++++++++------------ src/aclk/mqtt_websockets/mqtt_ng.h | 1 - src/aclk/mqtt_websockets/mqtt_wss_client.c | 316 ++++--------- src/aclk/mqtt_websockets/mqtt_wss_client.h | 47 +- src/aclk/mqtt_websockets/mqtt_wss_log.c | 126 ------ src/aclk/mqtt_websockets/mqtt_wss_log.h | 39 -- src/aclk/mqtt_websockets/ws_client.c | 247 +++++------ src/aclk/mqtt_websockets/ws_client.h | 11 +- src/libnetdata/c_rhash/c_rhash.c | 3 - src/libnetdata/libnetdata.c | 129 ++++-- src/libnetdata/libnetdata.h | 3 +- src/libnetdata/ringbuffer/ringbuffer.c | 3 - src/libnetdata/socket/security.h | 1 + 20 files changed, 546 insertions(+), 1016 deletions(-) delete mode 100644 src/aclk/mqtt_websockets/mqtt_wss_log.c delete mode 100644 src/aclk/mqtt_websockets/mqtt_wss_log.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 1847b738fd5301..da13eb46d809c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1555,8 +1555,6 @@ endif() set(MQTT_WEBSOCKETS_FILES src/aclk/mqtt_websockets/mqtt_wss_client.c src/aclk/mqtt_websockets/mqtt_wss_client.h - src/aclk/mqtt_websockets/mqtt_wss_log.c - src/aclk/mqtt_websockets/mqtt_wss_log.h src/aclk/mqtt_websockets/ws_client.c src/aclk/mqtt_websockets/ws_client.h src/aclk/mqtt_websockets/mqtt_ng.c @@ -1721,7 +1719,7 @@ target_include_directories(libnetdata BEFORE PUBLIC ${CONFIG_H_DIR} ${CMAKE_SOUR target_link_libraries(libnetdata PUBLIC "$<$>:atomic>" "$<$,$>:pthread;rt>" - "$<$:kernel32;advapi32;winmm;rpcrt4>" + "$<$:kernel32;advapi32;winmm;rpcrt4;bcrypt>" "$<$:m>" "${SYSTEMD_LDFLAGS}") diff --git a/src/aclk/aclk.c b/src/aclk/aclk.c index b1136b78177906..41f26ded5904b9 100644 --- a/src/aclk/aclk.c +++ b/src/aclk/aclk.c @@ -226,30 +226,6 @@ static int wait_till_agent_claim_ready() return 1; } -void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str) -{ - switch(log_type) { - case MQTT_WSS_LOG_ERROR: - case MQTT_WSS_LOG_FATAL: - nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str); - return; - - case MQTT_WSS_LOG_WARN: - nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str); - return; - - case MQTT_WSS_LOG_INFO: - nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str); - return; - - case MQTT_WSS_LOG_DEBUG: - return; - - default: - nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss"); - } -} - static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos) { UNUSED(qos); @@ -362,7 +338,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client) aclk_rcvd_cloud_msgs = 0; aclk_connection_counter++; - aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER; + size_t iter = 0; while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL) mqtt_wss_set_topic_alias(client, topic); @@ -768,7 +744,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client) */ void *aclk_main(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + struct netdata_static_thread *static_thread = ptr; ACLK_PROXY_TYPE proxy_type; aclk_get_proxy(&proxy_type); @@ -783,7 +759,7 @@ void *aclk_main(void *ptr) if (wait_till_agent_claim_ready()) goto exit; - if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) { + if (!((mqttwss_client = mqtt_wss_new(msg_callback, puback_callback)))) { netdata_log_error("Couldn't initialize MQTT_WSS network library"); goto exit; } @@ -1025,22 +1001,22 @@ char *aclk_state(void) } buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_online() ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No"); - if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) { + if (last_conn_time_mqtt && ((tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf); } - if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) { + if (last_conn_time_appl && ((tmptr = localtime_r(&last_conn_time_appl, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf); } - if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) { + if (last_disconnect_time && ((tmptr = localtime_r(&last_disconnect_time, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf); } - if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) { + if (!aclk_connected && next_connection_attempt && ((tmptr = localtime_r(&next_connection_attempt, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value); @@ -1107,7 +1083,7 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host) static json_object *timestamp_to_json(const time_t *t) { struct tm *tmptr, tmbuf; - if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) { + if (*t && ((tmptr = gmtime_r(t, &tmbuf))) ) { char timebuf[26]; strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr); return json_object_new_string(timebuf); diff --git a/src/aclk/aclk_otp.c b/src/aclk/aclk_otp.c index d3aade76e178d6..b2b8ab5a4a0086 100644 --- a/src/aclk/aclk_otp.c +++ b/src/aclk/aclk_otp.c @@ -267,40 +267,8 @@ static int aclk_parse_otp_error(const char *json_str) { } #endif -#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 -static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) -{ - EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); - - if (ctx != NULL) { - memset(ctx, 0, sizeof(*ctx)); - } - return ctx; -} -static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) -{ - OPENSSL_free(ctx); - return; -} -#endif - #define CHALLENGE_LEN 256 #define CHALLENGE_LEN_BASE64 344 -inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) -{ - unsigned char remaining_data[CHALLENGE_LEN]; - EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); - EVP_DecodeInit(ctx); - EVP_DecodeUpdate(ctx, out, outl, in, in_len); - int remainder = 0; - EVP_DecodeFinal(ctx, remaining_data, &remainder); - EVP_ENCODE_CTX_free(ctx); - if (remainder) { - netdata_log_error("Unexpected data at EVP_DecodeFinal"); - return 1; - } - return 0; -} #define OTP_URL_PREFIX "/api/v1/auth/node/" int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes, bool *fallback_ipv4) @@ -347,7 +315,7 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** goto cleanup_json; } const char *challenge_base64; - if (!(challenge_base64 = json_object_get_string(challenge_json))) { + if (!((challenge_base64 = json_object_get_string(challenge_json)))) { netdata_log_error("Failed to extract challenge from JSON object"); goto cleanup_json; } @@ -356,8 +324,9 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** goto cleanup_json; } - *challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3); - base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64)); + *challenge = mallocz(CHALLENGE_LEN); + *challenge_bytes = netdata_base64_decode(*challenge, (const unsigned char *) challenge_base64, CHALLENGE_LEN_BASE64); + if (*challenge_bytes != CHALLENGE_LEN) { netdata_log_error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN); freez(*challenge); @@ -375,7 +344,6 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char ** int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth, bool *fallback_ipv4) { - int len; int rc = 1; https_req_t req = HTTPS_REQ_T_INITIALIZER; https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER; @@ -387,7 +355,7 @@ int aclk_send_otp_response(const char *agent_id, const unsigned char *response, unsigned char base64[CHALLENGE_LEN_BASE64 + 1]; memset(base64, 0, CHALLENGE_LEN_BASE64 + 1); - base64_encode_helper(base64, &len, response, response_bytes); + (void) netdata_base64_encode(base64, response, response_bytes); BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk); BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk); diff --git a/src/aclk/aclk_tx_msgs.c b/src/aclk/aclk_tx_msgs.c index 8319e93654433b..2d256279e16db8 100644 --- a/src/aclk/aclk_tx_msgs.c +++ b/src/aclk/aclk_tx_msgs.c @@ -77,7 +77,7 @@ static short aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obj int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id); - if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER) + if (rc == MQTT_WSS_ERR_MSG_TOO_BIG) return HTTP_RESP_CONTENT_TOO_LONG; return 0; diff --git a/src/aclk/aclk_util.c b/src/aclk/aclk_util.c index 215782e507817c..16f57fe9aea325 100644 --- a/src/aclk/aclk_util.c +++ b/src/aclk/aclk_util.c @@ -309,7 +309,7 @@ const char *aclk_get_topic(enum aclk_topics topic) * having to resort to callbacks. */ -const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter) +const char *aclk_topic_cache_iterate(size_t *iter) { if (!aclk_topic_cache) { netdata_log_error("Topic cache not initialized when %s was called.", __FUNCTION__); @@ -434,44 +434,3 @@ void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt freez(proxy); } - -#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 -static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) -{ - EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); - - if (ctx != NULL) { - memset(ctx, 0, sizeof(*ctx)); - } - return ctx; -} -static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) -{ - OPENSSL_free(ctx); - return; -} -#endif - -int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) -{ - int len; - unsigned char *str = out; - EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); - EVP_EncodeInit(ctx); - EVP_EncodeUpdate(ctx, str, outl, in, in_len); - str += *outl; - EVP_EncodeFinal(ctx, str, &len); - *outl += len; - - str = out; - while(*str) { - if (*str != 0x0D && *str != 0x0A) - *out++ = *str++; - else - str++; - } - *out = 0; - - EVP_ENCODE_CTX_free(ctx); - return 0; -} diff --git a/src/aclk/aclk_util.h b/src/aclk/aclk_util.h index 3ab6f6f2edc380..24e17996431722 100644 --- a/src/aclk/aclk_util.h +++ b/src/aclk/aclk_util.h @@ -93,15 +93,10 @@ enum aclk_topics { ACLK_TOPICID_CTXS_UPDATED = 20 }; -typedef size_t aclk_topic_cache_iter_t; -#define ACLK_TOPIC_CACHE_ITER_T_INITIALIZER (0) - const char *aclk_get_topic(enum aclk_topics topic); -int aclk_generate_topic_cache(struct json_object *json); +int aclk_generate_topic_cache(json_object *json); void free_topic_cache(void); -const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter); -// TODO -// aclk_topics_reload //when claim id changes +const char *aclk_topic_cache_iterate(size_t *iter); #ifdef ACLK_LOG_CONVERSATION_DIR extern volatile int aclk_conversation_log_counter; @@ -113,6 +108,4 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type); -int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len); - #endif /* ACLK_UTIL_H */ diff --git a/src/aclk/https_client.c b/src/aclk/https_client.c index 5c99fad5724661..f144eaf15d5942 100644 --- a/src/aclk/https_client.c +++ b/src/aclk/https_client.c @@ -556,7 +556,7 @@ static int handle_http_request(https_req_ctx_t *ctx) { // we remove those but during encoding we need that space in the buffer creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n"); char *creds_base64 = callocz(1, creds_base64_len + 1); - base64_encode_helper((unsigned char*)creds_base64, &creds_base64_len, (unsigned char*)creds_plain, creds_plain_len); + (void) netdata_base64_encode((unsigned char *)creds_base64, (unsigned char *)creds_plain, creds_plain_len); buffer_sprintf(hdr, "Proxy-Authorization: Basic %s\x0D\x0A", creds_base64); freez(creds_plain); } @@ -584,7 +584,6 @@ static int handle_http_request(https_req_ctx_t *ctx) { if (ctx->parse_ctx.chunked_response) freez(ctx->parse_ctx.chunked_response); rc = 4; - goto err_exit; } err_exit: diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c index 96099aa687cd64..daf7931151bdec 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.c +++ b/src/aclk/mqtt_websockets/mqtt_ng.c @@ -8,16 +8,8 @@ #include "common_internal.h" #include "mqtt_constants.h" -#include "mqtt_wss_log.h" #include "mqtt_ng.h" -#define UNIT_LOG_PREFIX "mqtt_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) - #define SMALL_STRING_DONT_FRAGMENT_LIMIT 128 #define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock)) @@ -216,7 +208,7 @@ struct topic_aliases_data { c_rhash stoi_dict; uint32_t idx_max; uint32_t idx_assigned; - pthread_rwlock_t rwlock; + SPINLOCK spinlock; }; struct mqtt_ng_client { @@ -226,8 +218,6 @@ struct mqtt_ng_client { mqtt_msg_data connect_msg; - mqtt_wss_log_ctx_t log; - mqtt_ng_send_fnc_t send_fnc_ptr; void *user_ctx; @@ -245,7 +235,7 @@ struct mqtt_ng_client { unsigned int ping_pending:1; struct mqtt_ng_stats stats; - pthread_mutex_t stats_mutex; + SPINLOCK stats_spinlock; struct topic_aliases_data tx_topic_aliases; c_rhash rx_aliases; @@ -399,7 +389,7 @@ enum memory_mode { CALLER_RESPONSIBLE }; -static inline enum memory_mode ptr2memory_mode(void * ptr) { +static enum memory_mode ptr2memory_mode(void * ptr) { if (ptr == NULL) return MEMCPY; if (ptr == CALLER_RESPONSIBILITY) @@ -484,15 +474,8 @@ static void buffer_rebuild(struct header_buffer *buf) } while(frag); } -static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void buffer_garbage_collect(struct header_buffer *buf) { -#if !defined(MQTT_DEBUG_VERBOSE) && !defined(ADDITIONAL_CHECKS) - (void) log_ctx; -#endif -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection!"); -#endif - struct buffer_fragment *frag = BUFFER_FIRST_FRAG(buf); while (frag) { if (!frag_is_marked_for_gc(frag)) @@ -503,12 +486,8 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t frag = frag->next; } - if (frag == BUFFER_FIRST_FRAG(buf)) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Buffer Garbage Collection! No Space Reclaimed!"); -#endif + if (frag == BUFFER_FIRST_FRAG(buf)) return; - } if (!frag) { buf->tail_frag = NULL; @@ -527,21 +506,17 @@ static void buffer_garbage_collect(struct header_buffer *buf, mqtt_wss_log_ctx_t buffer_rebuild(buf); } -static void transaction_buffer_garbage_collect(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx) +static void transaction_buffer_garbage_collect(struct transaction_buffer *buf) { -#ifdef MQTT_DEBUG_VERBOSE - mws_debug(log_ctx, "Transaction Buffer Garbage Collection! %s", buf->sending_frag == NULL ? "NULL" : "in flight message"); -#endif - // Invalidate the cached sending fragment // as we will move data around if (buf->sending_frag != &ping_frag) buf->sending_frag = NULL; - buffer_garbage_collect(&buf->hdr_buffer, log_ctx); + buffer_garbage_collect(&buf->hdr_buffer); } -static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ctx_t log_ctx, float rate, size_t max) +static int transaction_buffer_grow(struct transaction_buffer *buf, float rate, size_t max) { if (buf->hdr_buffer.size >= max) return 0; @@ -557,29 +532,25 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_ void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size); if (ret == NULL) { - mws_warn(log_ctx, "Buffer growth failed (realloc)"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Buffer growth failed (realloc)"); return 1; } - mws_debug(log_ctx, "Message metadata buffer was grown"); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Message metadata buffer was grown"); buf->hdr_buffer.data = ret; buffer_rebuild(&buf->hdr_buffer); return 0; } -inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size) +inline static void transaction_buffer_init(struct transaction_buffer *to_init, size_t size) { spinlock_init(&to_init->spinlock); to_init->hdr_buffer.size = size; to_init->hdr_buffer.data = mallocz(size); - if (to_init->hdr_buffer.data == NULL) - return 1; - to_init->hdr_buffer.tail = to_init->hdr_buffer.data; to_init->hdr_buffer.tail_frag = NULL; - return 0; } static void transaction_buffer_destroy(struct transaction_buffer *to_init) @@ -620,54 +591,30 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings) { struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client)); - if (client == NULL) - return NULL; - if (transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE)) - goto err_free_client; + transaction_buffer_init(&client->main_buffer, HEADER_BUFFER_SIZE); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - goto err_free_trx_buf; - if (pthread_mutex_init(&client->stats_mutex, NULL)) - goto err_free_rx_alias; + spinlock_init(&client->stats_spinlock); + spinlock_init(&client->tx_topic_aliases.spinlock); client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) - goto err_free_stats_mutex; client->tx_topic_aliases.idx_max = UINT16_MAX; - if (pthread_rwlock_init(&client->tx_topic_aliases.rwlock, NULL)) - goto err_free_tx_alias; - // TODO just embed the struct into mqtt_ng_client client->parser.received_data = settings->data_in; client->send_fnc_ptr = settings->data_out_fnc; client->user_ctx = settings->user_ctx; - client->log = settings->log; - client->puback_callback = settings->puback_callback; client->connack_callback = settings->connack_callback; client->msg_callback = settings->msg_callback; return client; - -err_free_tx_alias: - c_rhash_destroy(client->tx_topic_aliases.stoi_dict); -err_free_stats_mutex: - pthread_mutex_destroy(&client->stats_mutex); -err_free_rx_alias: - c_rhash_destroy(client->rx_aliases); -err_free_trx_buf: - transaction_buffer_destroy(&client->main_buffer); -err_free_client: - freez(client); - return NULL; } -static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte) +static uint8_t get_control_packet_type(uint8_t first_hdr_byte) { return first_hdr_byte >> 4; } @@ -699,33 +646,27 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash) void mqtt_ng_destroy(struct mqtt_ng_client *client) { transaction_buffer_destroy(&client->main_buffer); - pthread_mutex_destroy(&client->stats_mutex); mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); - pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); freez(client); } -int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) +int frag_set_external_data(struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc) { if (frag->len) { // TODO?: This could potentially be done in future if we set rule // external data always follows in buffer data // could help reduce fragmentation in some messages but // currently not worth it considering time is tight - mws_fatal(log, UNIT_LOG_PREFIX "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "INTERNAL ERROR: Cannot set external data to fragment already containing in buffer data!"); return 1; } switch (ptr2memory_mode(data_free_fnc)) { case MEMCPY: frag->data = mallocz(data_len); - if (frag->data == NULL) { - mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add"); - return 1; - } memcpy(frag->data, data, data_len); break; case EXTERNAL_FREE_AFTER_USE: @@ -807,18 +748,18 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth, #define PACK_2B_INT(buffer, integer, frag) { *(uint16_t *)WRITE_POS(frag) = htobe16((integer)); \ DATA_ADVANCE(buffer, sizeof(uint16_t), frag); } -static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) +static int _optimized_add(struct header_buffer *buf, void *data, size_t data_len, free_fnc_t data_free_fnc, struct buffer_fragment **frag) { if (data_len > SMALL_STRING_DONT_FRAGMENT_LIMIT) { buffer_frag_flag_t flags = BUFFER_FRAG_DATA_EXTERNAL; if ((*frag)->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND) flags |= BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND; if( (*frag = buffer_new_frag(buf, flags)) == NULL ) { - mws_error(log_ctx, "Out of buffer space while generating the message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Out of buffer space while generating the message"); return 1; } - if (frag_set_external_data(log_ctx, *frag, data, data_len, data_free_fnc)) { - mws_error(log_ctx, "Error adding external data to newly created fragment"); + if (frag_set_external_data(*frag, data, data_len, data_free_fnc)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Error adding external data to newly created fragment"); return 1; } // we dont want to write to this fragment anymore @@ -833,31 +774,30 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx, return 0; } -#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \ - int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ +#define TRY_GENERATE_MESSAGE(generator_function, ...) \ + int rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_garbage_collect((&client->main_buffer), client->log); \ + transaction_buffer_garbage_collect((&client->main_buffer)); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \ LOCK_HDR_BUFFER(&client->main_buffer); \ - transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \ + transaction_buffer_grow((&client->main_buffer),GROWTH_FACTOR, client->max_mem_bytes); \ UNLOCK_HDR_BUFFER(&client->main_buffer); \ - rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \ + rc = generator_function(&client->main_buffer, ##__VA_ARGS__); \ } \ if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \ - mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \ } \ if (rc == MQTT_NG_MSGGEN_OK) { \ - pthread_mutex_lock(&client->stats_mutex); \ + spinlock_lock(&client->stats_spinlock); \ client->stats.tx_messages_queued++; \ - pthread_mutex_unlock(&client->stats_mutex); \ + spinlock_unlock(&client->stats_spinlock); \ } \ return rc; mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, struct mqtt_auth_properties *auth, struct mqtt_lwt_properties *lwt, uint8_t clean_start, @@ -865,7 +805,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, { // Sanity Checks First (are given parameters correct and up to MQTT spec) if (!auth->client_id) { - mws_error(log_ctx, "ClientID must be set. [MQTT-3.1.3-3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ClientID must be set. [MQTT-3.1.3-3]"); return NULL; } @@ -876,29 +816,29 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // however server MUST allow ClientIDs between 1-23 bytes [MQTT-3.1.3-5] // so we will warn client server might not like this and he is using it // at his own risk! - mws_warn(log_ctx, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is empty string. This might not be allowed by server [MQTT-3.1.3-6]"); } if(len > MQTT_MAX_CLIENT_ID) { // [MQTT-3.1.3-5] server MUST allow client_id length 1-32 // server MAY allow longer client_id, if user provides longer client_id // warn them he is doing so at his own risk! - mws_warn(log_ctx, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "client_id provided is longer than 23 bytes, server might not allow that [MQTT-3.1.3-5]"); } if (lwt) { if (lwt->will_message && lwt->will_message_size > 65535) { - mws_error(log_ctx, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Will message cannot be longer than 65535 bytes due to MQTT protocol limitations [MQTT-3.1.3-4] and [MQTT-1.5.6]"); return NULL; } if (!lwt->will_topic) { //TODO topic given with strlen==0 ? check specs - mws_error(log_ctx, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); + nd_log(NDLS_DAEMON, NDLP_ERR, "If will message is given will topic must also be given [MQTT-3.1.3.3]"); return NULL; } if (lwt->will_qos > MQTT_MAX_QOS) { // refer to [MQTT-3-1.2-12] - mws_error(log_ctx, "QOS for LWT message is bigger than max"); + nd_log(NDLS_DAEMON, NDLP_ERR, "QOS for LWT message is bigger than max"); return NULL; } } @@ -932,8 +872,10 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, *connect_flags = 0; if (auth->username) *connect_flags |= MQTT_CONNECT_FLAG_USERNAME; + if (auth->password) *connect_flags |= MQTT_CONNECT_FLAG_PASSWORD; + if (lwt) { *connect_flags |= MQTT_CONNECT_FLAG_LWT; *connect_flags |= lwt->will_qos << MQTT_CONNECT_FLAG_QOS_BITSHIFT; @@ -957,7 +899,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // [MQTT-3.1.3.1] Client identifier CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->client_id, strlen(auth->client_id), auth->client_id_free, &frag)) goto fail_rollback; if (lwt != NULL) { @@ -971,7 +913,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, // Will Topic [MQTT-3.1.3.3] CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(lwt->will_topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_topic, strlen(lwt->will_topic), lwt->will_topic_free, &frag)) goto fail_rollback; // Will Payload [MQTT-3.1.3.4] @@ -979,7 +921,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, lwt->will_message_size, frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, lwt->will_message, lwt->will_message_size, lwt->will_topic_free, &frag)) goto fail_rollback; } } @@ -989,7 +931,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->username), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->username, strlen(auth->username), auth->username_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->username, strlen(auth->username), auth->username_free, &frag)) goto fail_rollback; } @@ -998,7 +940,7 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf, BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->password), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, auth->password, strlen(auth->password), auth->password_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, auth->password, strlen(auth->password), auth->password_free, &frag)) goto fail_rollback; } trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1024,28 +966,23 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, buffer_purge(&client->main_buffer.hdr_buffer); UNLOCK_HDR_BUFFER(&client->main_buffer); - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); // according to MQTT spec topic aliases should not be persisted // even if clean session is true mqtt_ng_destroy_tx_alias_hash(client->tx_topic_aliases.stoi_dict); + client->tx_topic_aliases.stoi_dict = TX_ALIASES_INITIALIZE(); - if (client->tx_topic_aliases.stoi_dict == NULL) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - return 1; - } client->tx_topic_aliases.idx_assigned = 0; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); mqtt_ng_destroy_rx_alias_hash(client->rx_aliases); client->rx_aliases = RX_ALIASES_INITIALIZE(); - if (client->rx_aliases == NULL) - return 1; - client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, client->log, auth, lwt, clean_start, keep_alive); + client->connect_msg = mqtt_ng_generate_connect(&client->main_buffer, auth, lwt, clean_start, keep_alive); if (client->connect_msg == NULL) return 1; - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (clean_start) client->stats.tx_messages_queued = 1; else @@ -1053,7 +990,7 @@ int mqtt_ng_connect(struct mqtt_ng_client *client, client->stats.tx_messages_sent = 0; client->stats.rx_messages_rcvd = 0; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); client->client_state = CONNECT_PENDING; return 0; @@ -1065,15 +1002,16 @@ uint16_t get_unused_packet_id() { return packet_id ? packet_id : ++packet_id; } -static inline size_t mqtt_ng_publish_size(const char *topic, - size_t msg_len, - uint16_t topic_id) +static size_t mqtt_ng_publish_size( + const char *topic, + size_t msg_len, + uint16_t topic_id) { - size_t retval = 2 /* Topic Name Length */ - + (topic == NULL ? 0 : strlen(topic)) - + 2 /* Packet identifier */ - + 1 /* Properties Length TODO for now fixed to 1 property */ - + msg_len; + size_t retval = 2 + + (topic == NULL ? 0 : strlen(topic)) /* Topic Name Length */ + + 2 /* Packet identifier */ + + 1 /* Properties Length for now fixed to 1 property */ + + msg_len; if (topic_id) retval += 3; @@ -1082,7 +1020,6 @@ static inline size_t mqtt_ng_publish_size(const char *topic, } int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, - mqtt_wss_log_ctx_t log_ctx, char *topic, free_fnc_t topic_free, void *msg, @@ -1121,7 +1058,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, // [MQTT-3.3.2.1] PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag); if (topic != NULL) { - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, topic, strlen(topic), topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); } @@ -1145,7 +1082,7 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf, if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL ) goto fail_rollback; - if (frag_set_external_data(log_ctx, frag, msg, msg_len, msg_free)) + if (frag_set_external_data(frag, msg, msg_len, msg_free)) goto fail_rollback; trx_buf->hdr_buffer.tail_frag->flags |= BUFFER_FRAG_MQTT_PACKET_TAIL; @@ -1169,9 +1106,9 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, uint16_t *packet_id) { struct topic_alias_data *alias = NULL; - pthread_rwlock_rdlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); uint16_t topic_id = 0; @@ -1185,14 +1122,14 @@ int mqtt_ng_publish(struct mqtt_ng_client *client, } if (client->max_msg_size && PUBLISH_SP_SIZE + mqtt_ng_publish_size(topic, msg_len, topic_id) > client->max_msg_size) { - mws_error(client->log, "Message too big for server: %zu", msg_len); + nd_log(NDLS_DAEMON, NDLP_ERR, "Message too big for server: %zu", msg_len); return MQTT_NG_MSGGEN_MSG_TOO_BIG; } - TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id); } -static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) +static size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count) { size_t len = 2 /* Packet Identifier */ + 1 /* Properties Length TODO for now fixed 0 */; len += sub_count * (2 /* topic filter string length */ + 1 /* [MQTT-3.8.3.1] Subscription Options Byte */); @@ -1203,7 +1140,7 @@ static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_co return len; } -int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, struct mqtt_sub *subs, size_t sub_count) +int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, struct mqtt_sub *subs, size_t sub_count) { // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1238,7 +1175,7 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ for (size_t i = 0; i < sub_count; i++) { BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); PACK_2B_INT(&trx_buf->hdr_buffer, strlen(subs[i].topic), frag); - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) + if (_optimized_add(&trx_buf->hdr_buffer, subs[i].topic, strlen(subs[i].topic), subs[i].topic_free, &frag)) goto fail_rollback; BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback); *WRITE_POS(frag) = subs[i].options; @@ -1255,12 +1192,11 @@ int mqtt_ng_generate_subscribe(struct transaction_buffer *trx_buf, mqtt_wss_log_ int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, subs, sub_count); } -int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code) +int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1299,12 +1235,11 @@ int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code); + TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, reason_code); } -static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code) +static int mqtt_generate_puback(struct transaction_buffer *trx_buf, uint16_t packet_id, uint8_t reason_code) { - (void) log_ctx; // >> START THE RODEO << transaction_buffer_transaction_start(trx_buf); @@ -1344,7 +1279,7 @@ static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code) { - TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code); + TRY_GENERATE_MESSAGE(mqtt_generate_puback, packet_id, reason_code); } int mqtt_ng_ping(struct mqtt_ng_client *client) @@ -1361,7 +1296,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define MQTT_NG_CLIENT_PROTOCOL_ERROR -1 #define MQTT_NG_CLIENT_SERVER_RETURNED_ERROR -2 #define MQTT_NG_CLIENT_NOT_IMPL_YET -3 -#define MQTT_NG_CLIENT_OOM -4 #define MQTT_NG_CLIENT_INTERNAL_ERROR -5 #define BUF_READ_CHECK_AT_LEAST(buf, x) \ @@ -1370,10 +1304,10 @@ int mqtt_ng_ping(struct mqtt_ng_client *client) #define vbi_parser_reset_ctx(ctx) memset(ctx, 0, sizeof(struct mqtt_vbi_parser_ctx)) -static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data) { if (ctx->bytes > MQTT_VBI_MAXBYTES - 1) { - mws_error(log, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer can't be longer than %d bytes", MQTT_VBI_MAXBYTES); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } if (!ctx->bytes || ctx->data[ctx->bytes-1] & MQTT_VBI_CONTINUATION_FLAG) { @@ -1385,7 +1319,7 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w } if (mqtt_vbi_to_uint32(ctx->data, &ctx->result)) { - mws_error(log, "MQTT Variable Byte Integer failed to be parsed."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT Variable Byte Integer failed to be parsed."); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } @@ -1471,12 +1405,12 @@ struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t pr } // Parses [MQTT-2.2.2] -static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log) +static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data) { int rc; switch (ctx->state) { case PROPERTIES_LENGTH: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->properties_length = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1525,7 +1459,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR_BIN_LEN; break; default: - mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } break; @@ -1543,7 +1477,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_TYPE_STR; break; default: - mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type); return MQTT_NG_CLIENT_INTERNAL_ERROR; } break; @@ -1568,7 +1502,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t ctx->state = PROPERTY_NEXT; break; case PROPERTY_TYPE_VBI: - rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log); + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result; ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes; @@ -1618,9 +1552,9 @@ static int parse_connack_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1644,9 +1578,9 @@ static int parse_disconnect_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); break; case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for connack varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for connack varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1682,9 +1616,9 @@ static int parse_puback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - return parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + return parse_properties_array(&parser->properties_parser, parser->received_data); default: - ERROR("invalid state for puback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for puback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1707,7 +1641,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) mqtt_properties_parser_ctx_reset(&parser->properties_parser); /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1728,7 +1662,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client) return MQTT_NG_CLIENT_NEED_MORE_BYTES; default: - ERROR("invalid state for suback varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for suback varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1752,8 +1686,6 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) break; } publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */); - if (publish->topic == NULL) - return MQTT_NG_CLIENT_OOM; parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_TOPICNAME: @@ -1779,7 +1711,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) parser->mqtt_parsed_len += 2; /* FALLTHROUGH */ case MQTT_PARSE_VARHDR_PROPS: - rc = parse_properties_array(&parser->properties_parser, parser->received_data, client->log); + rc = parse_properties_array(&parser->properties_parser, parser->received_data); if (rc != MQTT_NG_CLIENT_PARSE_DONE) return rc; parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed; @@ -1789,7 +1721,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) { freez(publish->topic); publish->topic = NULL; - ERROR("Error parsing PUBLISH message"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error parsing PUBLISH message"); return MQTT_NG_CLIENT_PROTOCOL_ERROR; } publish->data_len = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len; @@ -1800,18 +1732,12 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client) BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len); publish->data = mallocz(publish->data_len); - if (publish->data == NULL) { - freez(publish->topic); - publish->topic = NULL; - return MQTT_NG_CLIENT_OOM; - } - rbuf_pop(parser->received_data, publish->data, publish->data_len); parser->mqtt_parsed_len += publish->data_len; return MQTT_NG_CLIENT_PARSE_DONE; default: - ERROR("invalid state for publish varhdr parser"); + nd_log(NDLS_DAEMON, NDLP_ERR, "invalid state for publish varhdr parser"); return MQTT_NG_CLIENT_INTERNAL_ERROR; } return MQTT_NG_CLIENT_OK_CALL_AGAIN; @@ -1831,7 +1757,7 @@ static int parse_data(struct mqtt_ng_client *client) parser->state = MQTT_PARSE_FIXED_HEADER_LEN; break; case MQTT_PARSE_FIXED_HEADER_LEN: - rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data, client->log); + rc = vbi_parser_parse(&parser->vbi_parser, parser->received_data); if (rc == MQTT_NG_CLIENT_PARSE_DONE) { parser->mqtt_fixed_hdr_remaining_length = parser->vbi_parser.result; parser->state = MQTT_PARSE_VARIABLE_HEADER; @@ -1874,7 +1800,7 @@ static int parse_data(struct mqtt_ng_client *client) return rc; case MQTT_CPT_PINGRESP: if (parser->mqtt_fixed_hdr_remaining_length) { - ERROR ("PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] + nd_log(NDLS_DAEMON, NDLP_ERR, "PINGRESP has to be 0 Remaining Length."); // [MQTT-3.13.1] return MQTT_NG_CLIENT_PROTOCOL_ERROR; } parser->state = MQTT_PARSE_MQTT_PACKET_DONE; @@ -1887,7 +1813,7 @@ static int parse_data(struct mqtt_ng_client *client) } return rc; default: - ERROR("Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); + nd_log(NDLS_DAEMON, NDLP_ERR, "Parsing Control Packet Type %" PRIu8 " not implemented yet.", get_control_packet_type(parser->mqtt_control_packet_type)); rbuf_bump_tail(parser->received_data, parser->mqtt_fixed_hdr_remaining_length); parser->state = MQTT_PARSE_MQTT_PACKET_DONE; return MQTT_NG_CLIENT_NOT_IMPL_YET; @@ -1950,7 +1876,7 @@ static int send_fragment(struct mqtt_ng_client *client) { if (bytes) processed = client->send_fnc_ptr(client->user_ctx, ptr, bytes); else - WARN("This fragment was fully sent already. This should not happen!"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "This fragment was fully sent already. This should not happen!"); frag->sent += processed; if (frag->sent != frag->len) @@ -1958,11 +1884,11 @@ static int send_fragment(struct mqtt_ng_client *client) { if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) { client->time_of_last_send = time(NULL); - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); if (client->main_buffer.sending_frag != &ping_frag) client->stats.tx_messages_queued--; client->stats.tx_messages_sent++; - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); client->main_buffer.sending_frag = NULL; return 1; } @@ -1986,7 +1912,7 @@ static void try_send_all(struct mqtt_ng_client *client) { } while(send_all_message_fragments(client) >= 0); } -static inline void mark_message_for_gc(struct buffer_fragment *frag) +static void mark_message_for_gc(struct buffer_fragment *frag) { while (frag) { frag->flags |= BUFFER_FRAG_GARBAGE_COLLECT; @@ -2004,7 +1930,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) while (frag) { if ( (frag->flags & BUFFER_FRAG_MQTT_PACKET_HEAD) && frag->packet_id == packet_id) { if (!frag->sent) { - ERROR("Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") belongs to MQTT packet which was not yet sent!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2014,7 +1940,7 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) } frag = frag->next; } - ERROR("Received packet_id (%" PRIu16 ") is unknown!", packet_id); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received packet_id (%" PRIu16 ") is unknown!", packet_id); UNLOCK_HDR_BUFFER(&client->main_buffer); return 1; } @@ -2022,110 +1948,113 @@ static int mark_packet_acked(struct mqtt_ng_client *client, uint16_t packet_id) int handle_incoming_traffic(struct mqtt_ng_client *client) { int rc; + while ((rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN) { + ; + } + if (rc != MQTT_NG_CLIENT_MQTT_PACKET_DONE) + return rc; + struct mqtt_publish *pub; - while( (rc = parse_data(client)) == MQTT_NG_CLIENT_OK_CALL_AGAIN ); - if ( rc == MQTT_NG_CLIENT_MQTT_PACKET_DONE ) { - struct mqtt_property *prop; -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("MQTT Packet Parsed Successfully!"); -#endif - pthread_mutex_lock(&client->stats_mutex); - client->stats.rx_messages_rcvd++; - pthread_mutex_unlock(&client->stats_mutex); - - switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) { - case MQTT_CPT_CONNACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received CONNACK"); -#endif - LOCK_HDR_BUFFER(&client->main_buffer); - mark_message_for_gc(client->connect_msg); - UNLOCK_HDR_BUFFER(&client->main_buffer); - client->connect_msg = NULL; - if (client->client_state != CONNECTING) { - ERROR("Received unexpected CONNACK"); - client->client_state = ERROR; - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { - INFO("MQTT server limits message size to %" PRIu32, prop->data.uint32); - client->max_msg_size = prop->data.uint32; - } - if (client->connack_callback) - client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); - if (!client->parser.mqtt_packet.connack.reason_code) { - INFO("MQTT Connection Accepted By Server"); - client->client_state = CONNECTED; - break; - } + struct mqtt_property *prop; + spinlock_lock(&client->stats_spinlock); + client->stats.rx_messages_rcvd++; + spinlock_unlock(&client->stats_spinlock); + + uint8_t ctrl_packet_type = get_control_packet_type(client->parser.mqtt_control_packet_type); + switch (ctrl_packet_type) { + case MQTT_CPT_CONNACK: + LOCK_HDR_BUFFER(&client->main_buffer); + mark_message_for_gc(client->connect_msg); + UNLOCK_HDR_BUFFER(&client->main_buffer); + + client->connect_msg = NULL; + + if (client->client_state != CONNECTING) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Received unexpected CONNACK"); client->client_state = ERROR; - return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; - case MQTT_CPT_PUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PUBACK %" PRIu16, client->parser.mqtt_packet.puback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - if (client->puback_callback) - client->puback_callback(client->parser.mqtt_packet.puback.packet_id); - break; - case MQTT_CPT_PINGRESP: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received PINGRESP"); -#endif - break; - case MQTT_CPT_SUBACK: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Received SUBACK %" PRIu16, client->parser.mqtt_packet.suback.packet_id); -#endif - if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) - return MQTT_NG_CLIENT_PROTOCOL_ERROR; + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + + if ((prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_MAX_PKT_SIZE)) != NULL) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT server limits message size to %" PRIu32, prop->data.uint32); + client->max_msg_size = prop->data.uint32; + } + + if (client->connack_callback) + client->connack_callback(client->user_ctx, client->parser.mqtt_packet.connack.reason_code); + if (!client->parser.mqtt_packet.connack.reason_code) { + nd_log(NDLS_DAEMON, NDLP_INFO, "MQTT Connection Accepted By Server"); + client->client_state = CONNECTED; break; - case MQTT_CPT_PUBLISH: -#ifdef MQTT_DEBUG_VERBOSE - DEBUG("Recevied PUBLISH"); -#endif - pub = &client->parser.mqtt_packet.publish; - if (pub->qos > 1) { - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) { - client->client_state = ERROR; - ERROR("Error generating PUBACK reply for PUBLISH"); - return rc; - } - if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { - // Topic Alias property was sent from server - void *topic_ptr; - if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { - if (pub->topic != NULL) { - ERROR("We do not yet support topic alias reassignment"); - return MQTT_NG_CLIENT_NOT_IMPL_YET; - } - pub->topic = topic_ptr; - } else { - if (pub->topic == NULL) { - ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); - return MQTT_NG_CLIENT_PROTOCOL_ERROR; - } - c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); + } + client->client_state = ERROR; + return MQTT_NG_CLIENT_SERVER_RETURNED_ERROR; + + case MQTT_CPT_PUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.puback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + if (client->puback_callback) + client->puback_callback(client->parser.mqtt_packet.puback.packet_id); + break; + + case MQTT_CPT_PINGRESP: + break; + + case MQTT_CPT_SUBACK: + if (mark_packet_acked(client, client->parser.mqtt_packet.suback.packet_id)) + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + break; + + case MQTT_CPT_PUBLISH: + pub = &client->parser.mqtt_packet.publish; + + if (pub->qos > 1) { + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_NOT_IMPL_YET; + } + + if ( pub->qos == 1 && ((rc = mqtt_ng_puback(client, pub->packet_id, 0))) ) { + client->client_state = ERROR; + nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating PUBACK reply for PUBLISH"); + return rc; + } + + if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) { + // Topic Alias property was sent from server + void *topic_ptr; + if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) { + if (pub->topic != NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "We do not yet support topic alias reassignment"); + return MQTT_NG_CLIENT_NOT_IMPL_YET; } + pub->topic = topic_ptr; + } else { + if (pub->topic == NULL) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Topic alias with id %d unknown and topic not set by server!", prop->data.uint8); + return MQTT_NG_CLIENT_PROTOCOL_ERROR; + } + c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic); } - if (client->msg_callback) - client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); - // in case we have property topic alias and we have topic we take over the string - // and add pointer to it into topic alias list - if (prop == NULL) - freez(pub->topic); - freez(pub->data); - return MQTT_NG_CLIENT_WANT_WRITE; - case MQTT_CPT_DISCONNECT: - INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); - client->client_state = DISCONNECTED; - break; - } + } + + if (client->msg_callback) + client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos); + // in case we have property topic alias and we have topic we take over the string + // and add pointer to it into topic alias list + if (prop == NULL) + freez(pub->topic); + freez(pub->data); + return MQTT_NG_CLIENT_WANT_WRITE; + + case MQTT_CPT_DISCONNECT: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code); + client->client_state = DISCONNECTED; + break; + + default: + nd_log(NDLS_DAEMON, NDLP_INFO, "Got unknown control packet %u from server", ctrl_packet_type); + break; } return rc; @@ -2173,9 +2102,9 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes) void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats) { - pthread_mutex_lock(&client->stats_mutex); + spinlock_lock(&client->stats_spinlock); memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats)); - pthread_mutex_unlock(&client->stats_mutex); + spinlock_unlock(&client->stats_spinlock); stats->tx_bytes_queued = 0; stats->tx_buffer_reclaimable = 0; @@ -2198,11 +2127,11 @@ void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stat int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) { uint16_t idx; - pthread_rwlock_wrlock(&client->tx_topic_aliases.rwlock); + spinlock_lock(&client->tx_topic_aliases.spinlock); if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) { - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_ERR, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute."); return 0; //0 is not a valid topic alias } @@ -2211,8 +2140,8 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) // this is not a problem for library but might be helpful to warn user // as it might indicate bug in their program (but also might be expected) idx = alias->idx; - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); - mws_debug(client->log, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); + spinlock_unlock(&client->tx_topic_aliases.spinlock); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "%s topic \"%s\" already has alias set. Ignoring.", __FUNCTION__, topic); return idx; } @@ -2223,6 +2152,6 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic) c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias); - pthread_rwlock_unlock(&client->tx_topic_aliases.rwlock); + spinlock_unlock(&client->tx_topic_aliases.spinlock); return idx; } diff --git a/src/aclk/mqtt_websockets/mqtt_ng.h b/src/aclk/mqtt_websockets/mqtt_ng.h index 8bd7434d58dd36..1661f540e2c795 100644 --- a/src/aclk/mqtt_websockets/mqtt_ng.h +++ b/src/aclk/mqtt_websockets/mqtt_ng.h @@ -67,7 +67,6 @@ int mqtt_ng_ping(struct mqtt_ng_client *client); typedef ssize_t (*mqtt_ng_send_fnc_t)(void *user_ctx, const void* buf, size_t len); struct mqtt_ng_init { - mqtt_wss_log_ctx_t log; rbuf_t data_in; mqtt_ng_send_fnc_t data_out_fnc; void *user_ctx; diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.c b/src/aclk/mqtt_websockets/mqtt_wss_client.c index 9b478b806a12b2..2b2c972bb7fe5d 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.c +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.c @@ -57,6 +57,8 @@ char *util_openssl_ret_err(int err) return "SSL_ERROR_SYSCALL"; case SSL_ERROR_SSL: return "SSL_ERROR_SSL"; + default: + break; } return "UNKNOWN"; } @@ -64,8 +66,6 @@ char *util_openssl_ret_err(int err) struct mqtt_wss_client_struct { ws_client *ws_client; - mqtt_wss_log_ctx_t log; - // immediate connection (e.g. proxy server) char *host; int port; @@ -117,69 +117,49 @@ static void mws_connack_callback_ng(void *user_ctx, int code) switch(code) { case 0: client->mqtt_connected = 1; - return; + break; //TODO manual labor: all the CONNACK error codes with some nice error message default: - mws_error(client->log, "MQTT CONNACK returned error %d", code); - return; + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT CONNACK returned error %d", code); + break; } } static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len) { mqtt_wss_client client = user_ctx; -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "mqtt_pal_sendall(len=%d)", len); -#endif int ret = ws_client_send(client->ws_client, WS_OP_BINARY_FRAME, buf, len); - if (ret >= 0 && (size_t)ret != len) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret); -#endif + if (ret >= 0 && (size_t)ret != len) client->mqtt_didnt_finish_write = 1; - } return ret; } -mqtt_wss_client mqtt_wss_new(const char *log_prefix, - mqtt_wss_log_callback_t log_callback, - msg_callback_fnc_t msg_callback, - void (*puback_callback)(uint16_t packet_id)) +mqtt_wss_client mqtt_wss_new( + msg_callback_fnc_t msg_callback, + void (*puback_callback)(uint16_t packet_id)) { - mqtt_wss_log_ctx_t log; - - log = mqtt_wss_log_ctx_create(log_prefix, log_callback); - if(!log) - return NULL; - SSL_library_init(); SSL_load_error_strings(); mqtt_wss_client client = callocz(1, sizeof(struct mqtt_wss_client_struct)); - if (!client) { - mws_error(log, "OOM alocating mqtt_wss_client"); - goto fail; - } spinlock_init(&client->stat_lock); client->msg_callback = msg_callback; client->puback_callback = puback_callback; - client->ws_client = ws_client_new(0, &client->target_host, log); + client->ws_client = ws_client_new(0, &client->target_host); if (!client->ws_client) { - mws_error(log, "Error creating ws_client"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error creating ws_client"); goto fail_1; } - client->log = log; - #ifdef __APPLE__ if (pipe(client->write_notif_pipe)) { #else if (pipe2(client->write_notif_pipe, O_CLOEXEC /*| O_DIRECT*/)) { #endif - mws_error(log, "Couldn't create pipe"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Couldn't create pipe"); goto fail_2; } @@ -189,7 +169,6 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix, client->poll_fds[POLLFD_SOCKET].events = POLLIN; struct mqtt_ng_init settings = { - .log = log, .data_in = client->ws_client->buf_to_mqtt, .data_out_fnc = &mqtt_send_cb, .user_ctx = client, @@ -197,22 +176,14 @@ mqtt_wss_client mqtt_wss_new(const char *log_prefix, .puback_callback = puback_callback, .msg_callback = msg_callback }; - if ( (client->mqtt = mqtt_ng_init(&settings)) == NULL ) { - mws_error(log, "Error initializing internal MQTT client"); - goto fail_3; - } + client->mqtt = mqtt_ng_init(&settings); return client; -fail_3: - close(client->write_notif_pipe[PIPE_WRITE_END]); - close(client->write_notif_pipe[PIPE_READ_END]); fail_2: ws_client_destroy(client->ws_client); fail_1: freez(client); -fail: - mqtt_wss_log_ctx_destroy(log); return NULL; } @@ -253,30 +224,25 @@ void mqtt_wss_destroy(mqtt_wss_client client) if (client->sockfd > 0) close(client->sockfd); - mqtt_wss_log_ctx_destroy(client->log); freez(client); } static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) { - SSL *ssl; - X509 *err_cert; - mqtt_wss_client client; - int err = 0, depth; - char *err_str; + int err = 0; - ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); - client = SSL_get_ex_data(ssl, 0); + SSL* ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()); + mqtt_wss_client client = SSL_get_ex_data(ssl, 0); // TODO handle depth as per https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html if (!preverify_ok) { err = X509_STORE_CTX_get_error(ctx); - depth = X509_STORE_CTX_get_error_depth(ctx); - err_cert = X509_STORE_CTX_get_current_cert(ctx); - err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); + int depth = X509_STORE_CTX_get_error_depth(ctx); + X509* err_cert = X509_STORE_CTX_get_current_cert(ctx); + char* err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0); - mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err, + nd_log(NDLS_DAEMON, NDLP_ERR, "verify error:num=%d:%s:depth=%d:%s", err, X509_verify_cert_error_string(err), depth, err_str); freez(err_str); @@ -286,7 +252,7 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) client->ssl_flags & MQTT_WSS_SSL_ALLOW_SELF_SIGNED) { preverify_ok = 1; - mws_error(client->log, "Self Signed Certificate Accepted as the connection was " + nd_log(NDLS_DAEMON, NDLP_ERR, "Self Signed Certificate Accepted as the connection was " "requested with MQTT_WSS_SSL_ALLOW_SELF_SIGNED"); } @@ -300,16 +266,14 @@ static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx) #define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A" #define HTTP_CODE_LEN 4 #define HTTP_REASON_MAX_LEN 512 -static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) +static int http_parse_reply(rbuf_t buf) { - char *ptr; char http_code_s[4]; - int http_code; int idx; if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) { if (rbuf_memcmp_n(buf, PROXY_HTTP10, strlen(PROXY_HTTP10))) { - mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy expected reply with \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); return 1; } } @@ -317,39 +281,37 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) rbuf_bump_tail(buf, strlen(PROXY_HTTP)); if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) { - mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing space after \"" PROXY_HTTP "\" or \"" PROXY_HTTP10 "\""); return 2; } if (!rbuf_pop(buf, http_code_s, HTTP_CODE_LEN)) { - mws_error(client->log, "http_proxy missing HTTP code"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy missing HTTP code"); return 3; } for (int i = 0; i < HTTP_CODE_LEN - 1; i++) if (http_code_s[i] > 0x39 || http_code_s[i] < 0x30) { - mws_error(client->log, "http_proxy HTTP code non numeric"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy HTTP code non numeric"); return 4; } http_code_s[HTTP_CODE_LEN - 1] = 0; - http_code = atoi(http_code_s); + int http_code = str2i(http_code_s); // TODO check if we ever have more headers here rbuf_find_bytes(buf, HTTP_ENDLINE, strlen(HTTP_ENDLINE), &idx); if (idx >= HTTP_REASON_MAX_LEN) { - mws_error(client->log, "http_proxy returned reason that is too long"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned reason that is too long"); return 5; } if (http_code != 200) { - ptr = mallocz(idx + 1); - if (!ptr) - return 6; + char *ptr = mallocz(idx + 1); rbuf_pop(buf, ptr, idx); ptr[idx] = 0; - mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy returned error code %d \"%s\"", http_code, ptr); freez(ptr); return 7; }/* else @@ -362,52 +324,11 @@ static int http_parse_reply(mqtt_wss_client client, rbuf_t buf) rbuf_bump_tail(buf, strlen(HTTP_HDR_TERMINATOR)); if (rbuf_bytes_available(buf)) { - mws_error(client->log, "http_proxy unexpected trailing bytes after end of HTTP hdr"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy unexpected trailing bytes after end of HTTP hdr"); return 8; } - mws_debug(client->log, "http_proxy CONNECT succeeded"); - return 0; -} - -#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 -static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) -{ - EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); - - if (ctx != NULL) { - memset(ctx, 0, sizeof(*ctx)); - } - return ctx; -} -static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) -{ - OPENSSL_free(ctx); - return; -} -#endif - -inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len) -{ - int len; - unsigned char *str = out; - EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); - EVP_EncodeInit(ctx); - EVP_EncodeUpdate(ctx, str, outl, in, in_len); - str += *outl; - EVP_EncodeFinal(ctx, str, &len); - *outl += len; - - str = out; - while(*str) { - if (*str != 0x0D && *str != 0x0A) - *out++ = *str++; - else - str++; - } - *out = 0; - - EVP_ENCODE_CTX_free(ctx); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "http_proxy CONNECT succeeded"); return 0; } @@ -418,13 +339,12 @@ static int http_proxy_connect(mqtt_wss_client client) rbuf_t r_buf = rbuf_create(4096); if (!r_buf) return 1; - char *r_buf_ptr; size_t r_buf_linear_insert_capacity; poll_fd.fd = client->sockfd; poll_fd.events = POLLIN; - r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); + char *r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE "Host: %s" HTTP_ENDLINE, PROXY_CONNECT, client->target_host, client->target_port, PROXY_HTTP, client->target_host); write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr)); @@ -433,7 +353,7 @@ static int http_proxy_connect(mqtt_wss_client client) size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2; char *creds_plain = mallocz(creds_plain_len); if (!creds_plain) { - mws_error(client->log, "OOM creds_plain"); + nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_plain"); rc = 6; goto cleanup; } @@ -444,7 +364,7 @@ static int http_proxy_connect(mqtt_wss_client client) char *creds_base64 = mallocz(creds_base64_len + 1); if (!creds_base64) { freez(creds_plain); - mws_error(client->log, "OOM creds_base64"); + nd_log(NDLS_DAEMON, NDLP_ERR, "OOM creds_base64"); rc = 6; goto cleanup; } @@ -454,8 +374,7 @@ static int http_proxy_connect(mqtt_wss_client client) *ptr++ = ':'; strcpy(ptr, client->proxy_passwd); - int b64_len; - base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain)); + (void) netdata_base64_encode((unsigned char*)creds_base64, (unsigned char*)creds_plain, strlen(creds_plain)); freez(creds_plain); r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); @@ -470,13 +389,13 @@ static int http_proxy_connect(mqtt_wss_client client) // or timeout while ((rc = poll(&poll_fd, 1, 1000)) >= 0) { if (!rc) { - mws_error(client->log, "http_proxy timeout waiting reply from proxy server"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy timeout waiting reply from proxy server"); rc = 2; goto cleanup; } r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity); if (!r_buf_ptr) { - mws_error(client->log, "http_proxy read ring buffer full"); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy read ring buffer full"); rc = 3; goto cleanup; } @@ -484,20 +403,20 @@ static int http_proxy_connect(mqtt_wss_client client) if (errno == EWOULDBLOCK || errno == EAGAIN) { continue; } - mws_error(client->log, "http_proxy error reading from socket \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "http_proxy error reading from socket \"%s\"", strerror(errno)); rc = 4; goto cleanup; } rbuf_bump_head(r_buf, rc); if (rbuf_find_bytes(r_buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &rc)) { rc = 0; - if (http_parse_reply(client, r_buf)) + if (http_parse_reply(r_buf)) rc = 5; goto cleanup; } } - mws_error(client->log, "proxy negotiation poll error \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "proxy negotiation poll error \"%s\"", strerror(errno)); rc = 5; cleanup: rbuf_free(r_buf); @@ -510,11 +429,11 @@ int mqtt_wss_connect( int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, - struct mqtt_wss_proxy *proxy, + const struct mqtt_wss_proxy *proxy, bool *fallback_ipv4) { if (!mqtt_params) { - mws_error(client->log, "mqtt_params can't be null!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_params can't be null!"); return -1; } @@ -571,7 +490,7 @@ int mqtt_wss_connect( struct timeval timeout = { .tv_sec = 10, .tv_usec = 0 }; int fd = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, client->host, 0, port_str, &timeout, fallback_ipv4); if (fd < 0) { - mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, port); return -3; } @@ -586,12 +505,12 @@ int mqtt_wss_connect( int flag = 1; int result = setsockopt(client->sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)); if (result < 0) - mws_error(client->log, "Could not dissable NAGLE"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not dissable NAGLE"); client->poll_fds[POLLFD_SOCKET].fd = client->sockfd; if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) { - mws_error(client->log, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno)); return -8; } @@ -607,7 +526,7 @@ int mqtt_wss_connect( SSL_library_init(); #else if (OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, NULL) != 1) { - mws_error(client->log, "Failed to initialize SSL"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize SSL"); return -1; }; #endif @@ -624,7 +543,7 @@ int mqtt_wss_connect( SSL_CTX_set_default_verify_paths(client->ssl_ctx); SSL_CTX_set_verify(client->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback); } else - mws_error(client->log, "SSL Certificate checking completely disabled!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL Certificate checking completely disabled!!!"); #ifdef MQTT_WSS_DEBUG if(client->ssl_ctx_keylog_cb) @@ -634,7 +553,7 @@ int mqtt_wss_connect( client->ssl = SSL_new(client->ssl_ctx); if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) { if (!SSL_set_ex_data(client->ssl, 0, client)) { - mws_error(client->log, "Could not SSL_set_ex_data"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Could not SSL_set_ex_data"); return -4; } } @@ -642,27 +561,27 @@ int mqtt_wss_connect( SSL_set_connect_state(client->ssl); if (!SSL_set_tlsext_host_name(client->ssl, client->target_host)) { - mws_error(client->log, "Error setting TLS SNI host"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error setting TLS SNI host"); return -7; } result = SSL_connect(client->ssl); if (result != -1 && result != 1) { - mws_error(client->log, "SSL could not connect"); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL could not connect"); return -5; } if (result == -1) { int ec = SSL_get_error(client->ssl, result); if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "Failed to start SSL connection"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to start SSL connection"); return -6; } } client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400); - mws_info(client->log, "Going to connect using internal MQTT 5 implementation"); + nd_log(NDLS_DAEMON, NDLP_INFO, "Going to connect using internal MQTT 5 implementation"); struct mqtt_auth_properties auth; auth.client_id = (char*)mqtt_params->clientid; auth.client_id_free = NULL; @@ -682,7 +601,7 @@ int mqtt_wss_connect( int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive); if (ret) { - mws_error(client->log, "Error generating MQTT connect"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error generating MQTT connect"); return 1; } @@ -691,7 +610,7 @@ int mqtt_wss_connect( // wait till MQTT connection is established while (!client->mqtt_connected) { if(mqtt_wss_service(client, -1)) { - mws_error(client->log, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port); + nd_log(NDLS_DAEMON, NDLP_ERR, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port); return 2; } } @@ -704,14 +623,14 @@ int mqtt_wss_connect( #define NSEC_PER_MSEC 1000000ULL #define NSEC_PER_SEC 1000000000ULL -static inline uint64_t boottime_usec(mqtt_wss_client client) { +static uint64_t boottime_usec(void) { struct timespec ts; #if defined(__APPLE__) || defined(__FreeBSD__) if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { #else if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) { #endif - mws_error(client->log, "clock_gettimte failed"); + nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettimte failed"); return 0; } return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC; @@ -720,7 +639,7 @@ static inline uint64_t boottime_usec(mqtt_wss_client client) { #define MWS_TIMED_OUT 1 #define MWS_ERROR 2 #define MWS_OK 0 -static inline const char *mqtt_wss_error_tos(int ec) +static const char *mqtt_wss_error_tos(int ec) { switch(ec) { case MWS_TIMED_OUT: @@ -733,13 +652,12 @@ static inline const char *mqtt_wss_error_tos(int ec) } -static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) +static int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) { - uint64_t exit_by = boottime_usec(client) + (timeout_ms * NSEC_PER_MSEC); - uint64_t now; + uint64_t exit_by = boottime_usec() + (timeout_ms * NSEC_PER_MSEC); client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT while (rbuf_bytes_available(client->ws_client->buf_write)) { - now = boottime_usec(client); + const uint64_t now = boottime_usec(); if (now >= exit_by) return MWS_TIMED_OUT; if (mqtt_wss_service(client, exit_by - now)) @@ -750,15 +668,13 @@ static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms) void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) { - int ret; - // block application from sending more MQTT messages client->mqtt_disconnecting = 1; // send whatever was left at the time of calling this function - ret = mqtt_wss_service_all(client, timeout_ms / 4); + int ret = mqtt_wss_service_all(client, timeout_ms / 4); if(ret) - mws_error(client->log, + nd_log(NDLS_DAEMON, NDLP_ERR, "Error while trying to send all remaining data in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\"", ret, @@ -770,7 +686,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) ret = mqtt_wss_service_all(client, timeout_ms / 4); if(ret) - mws_error(client->log, + nd_log(NDLS_DAEMON, NDLP_ERR, "Error while trying to send MQTT disconnect message in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\"", ret, @@ -783,7 +699,7 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) if(ret) { // Some MQTT/WSS servers will close socket on receipt of MQTT disconnect and // do not wait for WebSocket to be closed properly - mws_warn(client->log, + nd_log(NDLS_DAEMON, NDLP_WARNING, "Error while trying to send WebSocket disconnect message in an attempt " "to gracefully disconnect! EC=%d Desc:\"%s\".", ret, @@ -798,22 +714,19 @@ void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms) client->sockfd = -1; } -static inline void mqtt_wss_wakeup(mqtt_wss_client client) +static void mqtt_wss_wakeup(mqtt_wss_client client) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "mqtt_wss_wakup - forcing wake up of main loop"); -#endif write(client->write_notif_pipe[PIPE_WRITE_END], " ", 1); } #define THROWAWAY_BUF_SIZE 32 char throwaway[THROWAWAY_BUF_SIZE]; -static inline void util_clear_pipe(int fd) +static void util_clear_pipe(int fd) { (void)read(fd, throwaway, THROWAWAY_BUF_SIZE); } -static inline void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) { +static void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) { if (ssl_ret == SSL_ERROR_WANT_WRITE) client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; if (ssl_ret == SSL_ERROR_WANT_READ) @@ -824,7 +737,7 @@ static int handle_mqtt_internal(mqtt_wss_client client) { int rc = mqtt_ng_sync(client->mqtt); if (rc) { - mws_error(client->log, "mqtt_ng_sync returned %d != 0", rc); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_ng_sync returned %d != 0", rc); client->mqtt_connected = 0; return 1; } @@ -832,7 +745,7 @@ static int handle_mqtt_internal(mqtt_wss_client client) } #define SEC_TO_MSEC 1000 -static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client) +static long long int t_till_next_keepalive_ms(mqtt_wss_client client) { time_t last_send = mqtt_ng_last_send_time(client->mqtt); long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC) @@ -841,10 +754,10 @@ static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client) } #ifdef MQTT_WSS_CPUSTATS -static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) { +static uint64_t mqtt_wss_now_usec(void) { struct timespec ts; if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) { - mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed."); + nd_log(NDLS_DAEMON, NDLP_ERR, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed."); return 0; } return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC; @@ -859,61 +772,39 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) int send_keepalive = 0; #ifdef MQTT_WSS_CPUSTATS - uint64_t t1,t2; - t1 = mqtt_wss_now_usec(client); -#endif - -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<"); - mws_debug(client->log, "Waiting for events: %s%s%s", - (client->poll_fds[POLLFD_SOCKET].events & POLLIN) ? "SOCKET_POLLIN " : "", - (client->poll_fds[POLLFD_SOCKET].events & POLLOUT) ? "SOCKET_POLLOUT " : "", - (client->poll_fds[POLLFD_PIPE].events & POLLIN) ? "PIPE_POLLIN" : "" ); + uint64_t t2; + uint64_t t1 = mqtt_wss_now_usec(); #endif // Check user requested TO doesn't interfere with MQTT keep alives long long int till_next_keep_alive = t_till_next_keepalive_ms(client); if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) { - #ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Shortening Timeout requested %d to %lld to ensure keep-alive can be sent", timeout_ms, till_next_keep_alive); - #endif timeout_ms = till_next_keep_alive; send_keepalive = 1; } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_keepalive += t2 - t1; #endif if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) { if (errno == EINTR) { - mws_warn(client->log, "poll interrupted by EINTR"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "poll interrupted by EINTR"); return 0; } - mws_error(client->log, "poll error \"%s\"", strerror(errno)); + nd_log(NDLS_DAEMON, NDLP_ERR, "poll error \"%s\"", strerror(errno)); return -2; } -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Poll events happened: %s%s%s%s", - (client->poll_fds[POLLFD_SOCKET].revents & POLLIN) ? "SOCKET_POLLIN " : "", - (client->poll_fds[POLLFD_SOCKET].revents & POLLOUT) ? "SOCKET_POLLOUT " : "", - (client->poll_fds[POLLFD_PIPE].revents & POLLIN) ? "PIPE_POLLIN " : "", - (!ret) ? "POLL_TIMEOUT" : ""); -#endif - #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); #endif if (ret == 0) { if (send_keepalive) { // otherwise we shortened the timeout ourselves to take care of // MQTT keep alives -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Forcing MQTT Ping/keep-alive"); -#endif mqtt_ng_ping(client->mqtt); } else { // if poll timed out and user requested timeout was being used @@ -923,7 +814,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_keepalive += t2 - t1; #endif @@ -931,9 +822,6 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) { if((ret = SSL_read(client->ssl, ptr, size)) > 0) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "SSL_Read: Read %d.", ret); -#endif spinlock_lock(&client->stat_lock); client->stats.bytes_rx += ret; spinlock_unlock(&client->stat_lock); @@ -941,22 +829,19 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } else { int errnobkp = errno; ret = SSL_get_error(client->ssl, ret); -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Read Err: %s", util_openssl_ret_err(ret)); -#endif set_socket_pollfds(client, ret); if (ret != SSL_ERROR_WANT_READ && ret != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret)); if (ret == SSL_ERROR_SYSCALL) - mws_error(client->log, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); return MQTT_WSS_ERR_CONN_DROP; } } } #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); client->stats.time_read_socket += t1 - t2; #endif @@ -964,18 +849,20 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) switch(ret) { case WS_CLIENT_PROTOCOL_ERROR: return MQTT_WSS_ERR_PROTO_WS; + case WS_CLIENT_NEED_MORE_BYTES: -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "WSCLIENT WANT READ"); -#endif client->poll_fds[POLLFD_SOCKET].events |= POLLIN; break; + case WS_CLIENT_CONNECTION_CLOSED: return MQTT_WSS_ERR_CONN_DROP; + + default: + return MQTT_WSS_ERR_PROTO_WS; } #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_process_websocket += t2 - t1; #endif @@ -990,18 +877,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } #ifdef MQTT_WSS_CPUSTATS - t1 = mqtt_wss_now_usec(client); + t1 = mqtt_wss_now_usec(); client->stats.time_process_mqtt += t1 - t2; #endif if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Have data to write to SSL"); -#endif if ((ret = SSL_write(client->ssl, ptr, size)) > 0) { -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size); -#endif spinlock_lock(&client->stat_lock); client->stats.bytes_tx += ret; spinlock_unlock(&client->stat_lock); @@ -1009,15 +890,12 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) } else { int errnobkp = errno; ret = SSL_get_error(client->ssl, ret); -#ifdef DEBUG_ULTRA_VERBOSE - mws_debug(client->log, "Write Err: %s", util_openssl_ret_err(ret)); -#endif set_socket_pollfds(client, ret); if (ret != SSL_ERROR_WANT_READ && ret != SSL_ERROR_WANT_WRITE) { - mws_error(client->log, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret)); if (ret == SSL_ERROR_SYSCALL) - mws_error(client->log, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); + nd_log(NDLS_DAEMON, NDLP_ERR, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp)); return MQTT_WSS_ERR_CONN_DROP; } } @@ -1027,7 +905,7 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms) util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]); #ifdef MQTT_WSS_CPUSTATS - t2 = mqtt_wss_now_usec(client); + t2 = mqtt_wss_now_usec(); client->stats.time_write_socket += t2 - t1; #endif @@ -1044,12 +922,12 @@ int mqtt_wss_publish5(mqtt_wss_client client, uint16_t *packet_id) { if (client->mqtt_disconnecting) { - mws_error(client->log, "mqtt_wss is disconnecting can't publish"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't publish"); return 1; } if (!client->mqtt_connected) { - mws_error(client->log, "MQTT is offline. Can't send message."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't send message."); return 1; } uint8_t mqtt_flags = 0; @@ -1060,7 +938,7 @@ int mqtt_wss_publish5(mqtt_wss_client client, int rc = mqtt_ng_publish(client->mqtt, topic, topic_free, msg, msg_free, msg_len, mqtt_flags, packet_id); if (rc == MQTT_NG_MSGGEN_MSG_TOO_BIG) - return MQTT_WSS_ERR_TOO_BIG_FOR_SERVER; + return MQTT_WSS_ERR_MSG_TOO_BIG; mqtt_wss_wakeup(client); @@ -1071,12 +949,12 @@ int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level) { (void)max_qos_level; //TODO now hardcoded if (!client->mqtt_connected) { - mws_error(client->log, "MQTT is offline. Can't subscribe."); + nd_log(NDLS_DAEMON, NDLP_ERR, "MQTT is offline. Can't subscribe."); return 1; } if (client->mqtt_disconnecting) { - mws_error(client->log, "mqtt_wss is disconnecting can't subscribe"); + nd_log(NDLS_DAEMON, NDLP_ERR, "mqtt_wss is disconnecting can't subscribe"); return 1; } diff --git a/src/aclk/mqtt_websockets/mqtt_wss_client.h b/src/aclk/mqtt_websockets/mqtt_wss_client.h index 2f1c15954d86cc..2fd94075d6bbe5 100644 --- a/src/aclk/mqtt_websockets/mqtt_wss_client.h +++ b/src/aclk/mqtt_websockets/mqtt_wss_client.h @@ -3,49 +3,34 @@ #ifndef MQTT_WSS_CLIENT_H #define MQTT_WSS_CLIENT_H -#include "mqtt_wss_log.h" #include "common_public.h" -// All OK call me at your earliest convinience -#define MQTT_WSS_OK 0 -/* All OK, poll timeout you requested when calling mqtt_wss_service expired - you might want to know if timeout - * happened or we got some data or handle same as MQTT_WSS_OK - */ -#define MQTT_WSS_OK_TO 1 -// Connection was closed by remote -#define MQTT_WSS_ERR_CONN_DROP -1 -// Error in MQTT protocol (e.g. malformed packet) -#define MQTT_WSS_ERR_PROTO_MQTT -2 -// Error in WebSocket protocol (e.g. malformed packet) -#define MQTT_WSS_ERR_PROTO_WS -3 - -#define MQTT_WSS_ERR_TX_BUF_TOO_SMALL -4 -#define MQTT_WSS_ERR_RX_BUF_TOO_SMALL -5 - -#define MQTT_WSS_ERR_TOO_BIG_FOR_SERVER -6 -// if client was initialized with MQTT 3 but MQTT 5 feature -// was requested by user of library -#define MQTT_WSS_ERR_CANT_DO -8 + +#define MQTT_WSS_OK 0 // All OK call me at your earliest convinience +#define MQTT_WSS_OK_TO 1 // All OK, poll timeout you requested when calling mqtt_wss_service expired + //you might want to know if timeout + //happened or we got some data or handle same as MQTT_WSS_OK +#define MQTT_WSS_ERR_CONN_DROP -1 // Connection was closed by remote +#define MQTT_WSS_ERR_PROTO_MQTT -2 // Error in MQTT protocol (e.g. malformed packet) +#define MQTT_WSS_ERR_PROTO_WS -3 // Error in WebSocket protocol (e.g. malformed packet) +#define MQTT_WSS_ERR_MSG_TOO_BIG -6 // Message size too big for server +#define MQTT_WSS_ERR_CANT_DO -8 // if client was initialized with MQTT 3 but MQTT 5 feature + // was requested by user of library typedef struct mqtt_wss_client_struct *mqtt_wss_client; typedef void (*msg_callback_fnc_t)(const char *topic, const void *msg, size_t msglen, int qos); + /* Creates new instance of MQTT over WSS. Doesn't start connection. - * @param log_prefix this is prefix to be used when logging to discern between multiple - * mqtt_wss instances. Can be NULL. - * @param log_callback is function pointer to fnc to be called when mqtt_wss wants - * to log. This allows plugging this library into your own logging system/solution. - * If NULL STDOUT/STDERR will be used. * @param msg_callback is function pointer to function which will be called * when application level message arrives from broker (for subscribed topics). * Can be NULL if you are not interested about incoming messages. * @param puback_callback is function pointer to function to be called when QOS1 Publish * is acknowledged by server */ -mqtt_wss_client mqtt_wss_new(const char *log_prefix, - mqtt_wss_log_callback_t log_callback, - msg_callback_fnc_t msg_callback, - void (*puback_callback)(uint16_t packet_id)); +mqtt_wss_client mqtt_wss_new( + msg_callback_fnc_t msg_callback, + void (*puback_callback)(uint16_t packet_id)); void mqtt_wss_set_max_buf_size(mqtt_wss_client client, size_t size); @@ -71,7 +56,7 @@ int mqtt_wss_connect( int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, - struct mqtt_wss_proxy *proxy, + const struct mqtt_wss_proxy *proxy, bool *fallback_ipv4); int mqtt_wss_service(mqtt_wss_client client, int timeout_ms); void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms); diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.c b/src/aclk/mqtt_websockets/mqtt_wss_log.c deleted file mode 100644 index def2236af63937..00000000000000 --- a/src/aclk/mqtt_websockets/mqtt_wss_log.c +++ /dev/null @@ -1,126 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "libnetdata/libnetdata.h" - -#include "mqtt_wss_log.h" - -struct mqtt_wss_log_ctx { - mqtt_wss_log_callback_t extern_log_fnc; - char *ctx_prefix; - char *buffer; - char *buffer_w_ptr; - size_t buffer_bytes_avail; -}; - -#define LOG_BUFFER_SIZE 1024 * 4 -#define LOG_CTX_PREFIX_SEV_STR " : " -#define LOG_CTX_PREFIX_LIMIT 15 -#define LOG_CTX_PREFIX_LIMIT_STR (LOG_CTX_PREFIX_LIMIT - (2 + strlen(LOG_CTX_PREFIX_SEV_STR))) // with [] characters and affixed ' ' it is total 15 chars -#if (LOG_CTX_PREFIX_LIMIT * 10) > LOG_BUFFER_SIZE -#error "LOG_BUFFER_SIZE too small" -#endif -mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback) -{ - mqtt_wss_log_ctx_t ctx = callocz(1, sizeof(struct mqtt_wss_log_ctx)); - if(!ctx) - return NULL; - - if(log_callback) { - ctx->extern_log_fnc = log_callback; - ctx->buffer = callocz(1, LOG_BUFFER_SIZE); - if(!ctx->buffer) - goto cleanup; - - ctx->buffer_w_ptr = ctx->buffer; - if(ctx_prefix) { - *(ctx->buffer_w_ptr++) = '['; - strncpy(ctx->buffer_w_ptr, ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - ctx->buffer_w_ptr += strnlen(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - *(ctx->buffer_w_ptr++) = ']'; - } - strcpy(ctx->buffer_w_ptr, LOG_CTX_PREFIX_SEV_STR); - ctx->buffer_w_ptr += strlen(LOG_CTX_PREFIX_SEV_STR); - // no term '\0' -> calloc is used - - ctx->buffer_bytes_avail = LOG_BUFFER_SIZE - strlen(ctx->buffer); - - return ctx; - } - - if(ctx_prefix) { - ctx->ctx_prefix = strndup(ctx_prefix, LOG_CTX_PREFIX_LIMIT_STR); - if(!ctx->ctx_prefix) - goto cleanup; - } - - return ctx; - -cleanup: - freez(ctx); - return NULL; -} - -void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx) -{ - freez(ctx->ctx_prefix); - freez(ctx->buffer); - freez(ctx); -} - -static inline char severity_to_c(int severity) -{ - switch (severity) { - case MQTT_WSS_LOG_FATAL: - return 'F'; - case MQTT_WSS_LOG_ERROR: - return 'E'; - case MQTT_WSS_LOG_WARN: - return 'W'; - case MQTT_WSS_LOG_INFO: - return 'I'; - case MQTT_WSS_LOG_DEBUG: - return 'D'; - default: - return '?'; - } -} - -void mws_log(int severity, mqtt_wss_log_ctx_t ctx, const char *fmt, va_list args) -{ - size_t size; - - if(ctx->extern_log_fnc) { - size = vsnprintf(ctx->buffer_w_ptr, ctx->buffer_bytes_avail, fmt, args); - *(ctx->buffer_w_ptr - 3) = severity_to_c(severity); - - ctx->extern_log_fnc(severity, ctx->buffer); - - if(size >= ctx->buffer_bytes_avail) - mws_error(ctx, "Last message of this type was truncated! Consider what you log or increase LOG_BUFFER_SIZE if really needed."); - - return; - } - - if(ctx->ctx_prefix) - printf("[%s] ", ctx->ctx_prefix); - - printf("%c: ", severity_to_c(severity)); - - vprintf(fmt, args); - putchar('\n'); -} - -#define DEFINE_MWS_SEV_FNC(severity_fncname, severity) \ -void mws_ ## severity_fncname(mqtt_wss_log_ctx_t ctx, const char *fmt, ...) \ -{ \ - va_list args; \ - va_start(args, fmt); \ - mws_log(severity, ctx, fmt, args); \ - va_end(args); \ -} - -DEFINE_MWS_SEV_FNC(fatal, MQTT_WSS_LOG_FATAL) -DEFINE_MWS_SEV_FNC(error, MQTT_WSS_LOG_ERROR) -DEFINE_MWS_SEV_FNC(warn, MQTT_WSS_LOG_WARN ) -DEFINE_MWS_SEV_FNC(info, MQTT_WSS_LOG_INFO ) -DEFINE_MWS_SEV_FNC(debug, MQTT_WSS_LOG_DEBUG) diff --git a/src/aclk/mqtt_websockets/mqtt_wss_log.h b/src/aclk/mqtt_websockets/mqtt_wss_log.h deleted file mode 100644 index 6ae60d870b4b83..00000000000000 --- a/src/aclk/mqtt_websockets/mqtt_wss_log.h +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright: SPDX-License-Identifier: GPL-3.0-only - -#ifndef MQTT_WSS_LOG_H -#define MQTT_WSS_LOG_H - -typedef enum mqtt_wss_log_type { - MQTT_WSS_LOG_DEBUG = 0x01, - MQTT_WSS_LOG_INFO = 0x02, - MQTT_WSS_LOG_WARN = 0x03, - MQTT_WSS_LOG_ERROR = 0x81, - MQTT_WSS_LOG_FATAL = 0x88 -} mqtt_wss_log_type_t; - -typedef void (*mqtt_wss_log_callback_t)(mqtt_wss_log_type_t, const char*); - -typedef struct mqtt_wss_log_ctx *mqtt_wss_log_ctx_t; - -/** Creates logging context with optional prefix and optional callback - * @param ctx_prefix String to be prefixed to every log message. - * This is useful if multiple clients are instantiated to be able to - * know which one this message belongs to. Can be `NULL` for no prefix. - * @param log_callback Callback to be called instead of logging to - * `STDOUT` or `STDERR` (if debug enabled otherwise silent). Callback has to be - * pointer to function of `void function(mqtt_wss_log_type_t, const char*)` type. - * If `NULL` default will be used (silent or STDERR/STDOUT). - * @return mqtt_wss_log_ctx_t or `NULL` on error */ -mqtt_wss_log_ctx_t mqtt_wss_log_ctx_create(const char *ctx_prefix, mqtt_wss_log_callback_t log_callback); - -/** Destroys logging context and cleans up the memory - * @param ctx Context to destroy */ -void mqtt_wss_log_ctx_destroy(mqtt_wss_log_ctx_t ctx); - -void mws_fatal(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_error(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_warn (mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_info (mqtt_wss_log_ctx_t ctx, const char *fmt, ...); -void mws_debug(mqtt_wss_log_ctx_t ctx, const char *fmt, ...); - -#endif /* MQTT_WSS_LOG_H */ diff --git a/src/aclk/mqtt_websockets/ws_client.c b/src/aclk/mqtt_websockets/ws_client.c index ed39967ce88a61..14c3f19b2fe05d 100644 --- a/src/aclk/mqtt_websockets/ws_client.c +++ b/src/aclk/mqtt_websockets/ws_client.c @@ -5,78 +5,54 @@ #include "ws_client.h" #include "common_internal.h" -#define UNIT_LOG_PREFIX "ws_client: " -#define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) -#define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__) +#ifdef OS_WINDOWS +#include +#include // For BCryptGenRandom +#endif + +static uint32_t generate_random_32bit(void) { + uint32_t random_number = 0; + + if (RAND_bytes((unsigned char *)&random_number, sizeof(random_number)) != 1) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to generate a random uint32 mask"); + } + + return random_number; +} const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A" "Host: %s\x0D\x0A" "Upgrade: websocket\x0D\x0A" "Connection: Upgrade\x0D\x0A" "Sec-WebSocket-Key: %s\x0D\x0A" - "Origin: http://example.com\x0D\x0A" + "Origin: \x0D\x0A" "Sec-WebSocket-Protocol: mqtt\x0D\x0A" "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A"; const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; #define DEFAULT_RINGBUFFER_SIZE (1024*128) -#define ENTROPY_SOURCE "/dev/urandom" -ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log) -{ - ws_client *client; +ws_client *ws_client_new(size_t buf_size, char **host) +{ if(!host) return NULL; - client = callocz(1, sizeof(ws_client)); - if (!client) - return NULL; - + ws_client *client = callocz(1, sizeof(ws_client)); client->host = host; - client->log = log; - client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_read) - goto cleanup; - client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_write) - goto cleanup_1; - client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE); - if (!client->buf_to_mqtt) - goto cleanup_2; - - client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY | O_CLOEXEC); - if (client->entropy_fd < 1) { - ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno)); - goto cleanup_3; - } return client; - -cleanup_3: - rbuf_free(client->buf_to_mqtt); -cleanup_2: - rbuf_free(client->buf_write); -cleanup_1: - rbuf_free(client->buf_read); -cleanup: - freez(client); - return NULL; } void ws_client_free_headers(ws_client *client) { struct http_header *ptr = client->hs.headers; - struct http_header *tmp; while (ptr) { - tmp = ptr; + struct http_header *tmp = ptr; ptr = ptr->next; freez(tmp); } @@ -91,7 +67,6 @@ void ws_client_destroy(ws_client *client) ws_client_free_headers(client); freez(client->hs.nonce_reply); freez(client->hs.http_reply_msg); - close(client->entropy_fd); rbuf_free(client->buf_read); rbuf_free(client->buf_write); rbuf_free(client->buf_to_mqtt); @@ -120,7 +95,7 @@ void ws_client_reset(ws_client *client) int ws_client_add_http_header(ws_client *client, struct http_header *hdr) { if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) { - ERROR("Too many HTTP response header fields"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Too many HTTP response header fields"); return -1; } @@ -135,7 +110,7 @@ int ws_client_add_http_header(ws_client *client, struct http_header *hdr) return 0; } -int ws_client_want_write(ws_client *client) +int ws_client_want_write(const ws_client *client) { return rbuf_bytes_available(client->buf_write); } @@ -144,78 +119,92 @@ int ws_client_want_write(ws_client *client) #define TEMP_BUF_SIZE 4096 int ws_client_start_handshake(ws_client *client) { - nd_uuid_t nonce; + unsigned char nonce[WEBSOCKET_NONCE_SIZE]; char nonce_b64[256]; char second[TEMP_BUF_SIZE]; unsigned int md_len; - unsigned char *digest; + unsigned char digest[EVP_MAX_MD_SIZE]; // EVP_MAX_MD_SIZE ensures enough space EVP_MD_CTX *md_ctx; const EVP_MD *md; + int rc = 1; if(!client->host || !*client->host) { - ERROR("Hostname has not been set. We should not be able to come here!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Hostname has not been set. We should not be able to come here!"); return 1; } - uuid_generate_random(nonce); - EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE); - snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); - - if(rbuf_bytes_free(client->buf_write) < strlen(second)) { - ERROR("Write buffer capacity too low."); + // Generate a random 16-byte nonce + if (!RAND_bytes(nonce, WEBSOCKET_NONCE_SIZE)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to generate nonce"); return 1; } - rbuf_push(client->buf_write, second, strlen(second)); - client->state = WS_HANDSHAKE; - - //Calculating expected Sec-WebSocket-Accept reply - snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); - + // Initialize the digest context #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) md_ctx = EVP_MD_CTX_create(); #else md_ctx = EVP_MD_CTX_new(); #endif if (md_ctx == NULL) { - ERROR("Cant create EVP_MD Context"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Can't create EVP_MD context"); return 1; } - md = EVP_get_digestbyname("sha1"); + md = EVP_sha1(); // Use SHA-1 for WebSocket handshake if (!md) { - ERROR("Unknown message digest"); - return 1; + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown message digest SHA-1"); + goto exit_with_error; } - if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) { - ERROR("Cant alloc digest"); - return 1; + (void) netdata_base64_encode((unsigned char *) nonce_b64, nonce, WEBSOCKET_NONCE_SIZE); + + // Format and push the upgrade header to the write buffer + size_t bytes = snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr, *client->host, nonce_b64); + if(rbuf_bytes_free(client->buf_write) < bytes) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Write buffer capacity too low."); + goto exit_with_error; + } + rbuf_push(client->buf_write, second, bytes); + + client->state = WS_HANDSHAKE; + + // Create the expected Sec-WebSocket-Accept value + bytes = snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid); + + if (!EVP_DigestInit_ex(md_ctx, md, NULL)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to initialize digest context"); + goto exit_with_error; + } + + if (!EVP_DigestUpdate(md_ctx, second, bytes)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to update digest"); + goto exit_with_error; } - EVP_DigestInit_ex(md_ctx, md, NULL); - EVP_DigestUpdate(md_ctx, second, strlen(second)); - EVP_DigestFinal_ex(md_ctx, digest, &md_len); + if (!EVP_DigestFinal_ex(md_ctx, digest, &md_len)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Failed to finalize digest"); + goto exit_with_error; + } - EVP_EncodeBlock((unsigned char *)nonce_b64, digest, (int) md_len); + (void) netdata_base64_encode((unsigned char *) nonce_b64, digest, md_len); freez(client->hs.nonce_reply); client->hs.nonce_reply = strdupz(nonce_b64); + rc = 0; - OPENSSL_free(digest); - +exit_with_error: #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) EVP_MD_CTX_destroy(md_ctx); #else EVP_MD_CTX_free(md_ctx); #endif - return 0; + return rc; } #define BUF_READ_MEMCMP_CONST(const, err) \ if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \ - ERROR(err); \ + nd_log(NDLS_DAEMON, NDLP_ERR, err); \ rbuf_flush(client->buf_read); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -241,7 +230,7 @@ int ws_client_start_handshake(ws_client *client) #define HTTP_HDR_LINE_CHECK_LIMIT(x) \ if ((x) >= MAX_HTTP_LINE_LENGTH) { \ - ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \ return WS_CLIENT_PROTOCOL_ERROR; \ } @@ -264,13 +253,13 @@ int ws_client_parse_handshake_resp(ws_client *client) BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH); if (buf[HTTP_SC_LENGTH - 1] != 0x20) { - ERROR("HTTP status code received is not terminated by space (0x20)"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received is not terminated by space (0x20)"); return WS_CLIENT_PROTOCOL_ERROR; } buf[HTTP_SC_LENGTH - 1] = 0; client->hs.http_code = atoi(buf); if (client->hs.http_code < 100 || client->hs.http_code >= 600) { - ERROR("HTTP status code received not in valid range 100-600"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP status code received not in valid range 100-600"); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.hdr_state = WS_HDR_ENDLINE; @@ -309,16 +298,16 @@ int ws_client_parse_handshake_resp(ws_client *client) ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep); if (!ptr || idx_sep > idx_crlf) { - ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) { - ERROR("HTTP Header value cannot be empty"); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP Header value cannot be empty"); return WS_CLIENT_PROTOCOL_ERROR; } if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) { - ERROR("HTTP header too long (%d)", idx_sep); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP header too long (%d)", idx_sep); return WS_CLIENT_PROTOCOL_ERROR; } @@ -326,23 +315,21 @@ int ws_client_parse_handshake_resp(ws_client *client) hdr->key = ((char*)hdr) + sizeof(struct http_header); hdr->value = hdr->key + idx_sep + 1; - bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep); + rbuf_pop(client->buf_read, hdr->key, idx_sep); rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR)); - bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); + rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR)); rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE)); for (int i = 0; hdr->key[i]; i++) hdr->key[i] = tolower(hdr->key[i]); -// DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value); - if (ws_client_add_http_header(client, hdr)) return WS_CLIENT_PROTOCOL_ERROR; if (!strcmp(hdr->key, WS_CONN_ACCEPT)) { if (strcmp(client->hs.nonce_reply, hdr->value)) { - ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); + nd_log(NDLS_DAEMON, NDLP_ERR, "Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply); return WS_CLIENT_PROTOCOL_ERROR; } client->hs.nonce_matched = 1; @@ -352,21 +339,21 @@ int ws_client_parse_handshake_resp(ws_client *client) case WS_HDR_PARSE_DONE: if (!client->hs.nonce_matched) { - ERROR("Missing " WS_CONN_ACCEPT " header"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Missing " WS_CONN_ACCEPT " header"); return WS_CLIENT_PROTOCOL_ERROR; } if (client->hs.http_code != 101) { - ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); + nd_log(NDLS_DAEMON, NDLP_ERR, "HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg); return WS_CLIENT_PROTOCOL_ERROR; } client->state = WS_ESTABLISHED; client->hs.hdr_state = WS_HDR_ALL_DONE; - INFO("Websocket Connection Accepted By Server"); + nd_log(NDLS_DAEMON, NDLP_INFO, "Websocket Connection Accepted By Server"); return WS_CLIENT_PARSING_DONE; case WS_HDR_ALL_DONE: - FATAL("This is error we should never come here!"); + nd_log(NDLS_DAEMON, NDLP_CRIT, "This is error we should never come here!"); return WS_CLIENT_PROTOCOL_ERROR; } return 0; @@ -376,7 +363,7 @@ int ws_client_parse_handshake_resp(ws_client *client) #define WS_FINAL_FRAG BYTE_MSB #define WS_PAYLOAD_MASKED BYTE_MSB -static inline size_t get_ws_hdr_size(size_t payload_size) +static size_t get_ws_hdr_size(size_t payload_size) { size_t hdr_len = 2 + 4 /*mask*/; if(payload_size > 125) @@ -387,7 +374,7 @@ static inline size_t get_ws_hdr_size(size_t payload_size) } #define MAX_POSSIBLE_HDR_LEN 14 -int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) +int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size) { // TODO maybe? implement fragmenting, it is not necessary though // as both tested MQTT brokers have no reuirement of one MQTT envelope @@ -395,24 +382,16 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch // one big MQTT message as single fragmented WebSocket envelope char hdr[MAX_POSSIBLE_HDR_LEN]; char *ptr = hdr; - char *mask; int size_written = 0; size_t j = 0; size_t w_buff_free = rbuf_bytes_free(client->buf_write); size_t hdr_len = get_ws_hdr_size(size); - if (w_buff_free < hdr_len * 2) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Write buffer full. Can't write requested %d size.", size); -#endif + if (w_buff_free < hdr_len * 2) return 0; - } if (w_buff_free < (hdr_len + size)) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len); -#endif size = w_buff_free - hdr_len; hdr_len = get_ws_hdr_size(size); // the actual needed header size might decrease if we cut number of bytes @@ -438,12 +417,14 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch ptr += sizeof(be); } else *ptr++ |= size; - - mask = ptr; - if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) { - ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\""); + + char *mask = ptr; + uint32_t mask32 = generate_random_32bit(); + if (!mask32) { + nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to get mask to XOR websocket payload"); return -2; } + memcpy(mask, &mask32, sizeof(mask32)); rbuf_push(client->buf_write, hdr, hdr_len); @@ -469,7 +450,7 @@ int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const ch return size_written; } -static int check_opcode(ws_client *client,enum websocket_opcode oc) +static int check_opcode(enum websocket_opcode oc) { switch(oc) { case WS_OP_BINARY_FRAME: @@ -477,34 +458,34 @@ static int check_opcode(ws_client *client,enum websocket_opcode oc) case WS_OP_PING: return 0; case WS_OP_CONTINUATION_FRAME: - FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_TEXT_FRAME: - FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!"); return 0; case WS_OP_PONG: - FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WS_OP_PONG NOT IMPLEMENTED YET!!!!"); return 0; default: return WS_CLIENT_PROTOCOL_ERROR; } } -static inline void ws_client_rx_post_hdr_state(ws_client *client) +static void ws_client_rx_post_hdr_state(ws_client *client) { switch(client->rx.opcode) { case WS_OP_BINARY_FRAME: client->rx.parse_state = WS_PAYLOAD_DATA; - return; + break; case WS_OP_CONNECTION_CLOSE: client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE; - return; + break; case WS_OP_PING: client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD; - return; + break; default: client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD; - return; + break; } } @@ -520,15 +501,15 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.opcode = buf[0] & (char)~BYTE_MSB; if (!(buf[0] & (char)~WS_FINAL_FRAG)) { - ERROR("Not supporting fragmented messages yet!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Not supporting fragmented messages yet!"); return WS_CLIENT_PROTOCOL_ERROR; } - if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) + if (check_opcode(client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR) return WS_CLIENT_PROTOCOL_ERROR; if (buf[1] & (char)WS_PAYLOAD_MASKED) { - ERROR("Mask is not allowed in Server->Client Websocket direction."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Mask is not allowed in Server->Client Websocket direction."); return WS_CLIENT_PROTOCOL_ERROR; } @@ -563,12 +544,8 @@ int ws_client_process_rx_ws(ws_client *client) if (!rbuf_bytes_available(client->buf_read)) return WS_CLIENT_NEED_MORE_BYTES; char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size); - if (!insert) { -#ifdef DEBUG_ULTRA_VERBOSE - DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining); -#endif + if (!insert) return WS_CLIENT_BUFFER_FULL; - } size = (size > remaining) ? remaining : size; size = rbuf_pop(client->buf_read, insert, size); rbuf_bump_head(client->buf_to_mqtt, size); @@ -582,11 +559,11 @@ int ws_client_process_rx_ws(ws_client *client) // b) 2byte reason code // c) 2byte reason code followed by message if (client->rx.payload_length == 1) { - ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1"); + nd_log(NDLS_DAEMON, NDLP_ERR, "WebScoket CONNECTION_CLOSE can't have payload of size 1"); return WS_CLIENT_PROTOCOL_ERROR; } if (!client->rx.payload_length) { - INFO("WebSocket server closed the connection without giving reason."); + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection without giving reason."); client->rx.parse_state = WS_PACKET_DONE; break; } @@ -600,7 +577,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_processed += sizeof(uint16_t); if(client->rx.payload_processed == client->rx.payload_length) { - INFO("WebSocket server closed the connection with EC=%d. Without message.", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d. Without message.", client->rx.specific_data.op_close.ec); client->rx.parse_state = WS_PACKET_DONE; break; @@ -619,7 +596,7 @@ int ws_client_process_rx_ws(ws_client *client) client->rx.payload_length - client->rx.payload_processed); } client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0; - INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"", + nd_log(NDLS_DAEMON, NDLP_INFO, "WebSocket server closed the connection with EC=%d and reason \"%s\"", client->rx.specific_data.op_close.ec, client->rx.specific_data.op_close.reason); freez(client->rx.specific_data.op_close.reason); @@ -628,14 +605,14 @@ int ws_client_process_rx_ws(ws_client *client) break; case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD: BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); - WARN("Skipping Websocket Packet of unsupported/unknown type"); + nd_log(NDLS_DAEMON, NDLP_WARNING, "Skipping Websocket Packet of unsupported/unknown type"); if (client->rx.payload_length) rbuf_bump_tail(client->buf_read, client->rx.payload_length); client->rx.parse_state = WS_PACKET_DONE; return WS_CLIENT_PARSING_DONE; case WS_PAYLOAD_PING_REQ_PAYLOAD: if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) { - ERROR("Ping arrived with payload which is too big!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Ping arrived with payload which is too big!"); return WS_CLIENT_INTERNAL_ERROR; } BUF_READ_CHECK_AT_LEAST(client->rx.payload_length); @@ -645,7 +622,7 @@ int ws_client_process_rx_ws(ws_client *client) // then attempt to send as soon as buffer space clears up size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length); if (size != client->rx.payload_length) { - ERROR("Unable to send the PONG as one packet back. Closing connection."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unable to send the PONG as one packet back. Closing connection."); return WS_CLIENT_PROTOCOL_ERROR; } client->rx.parse_state = WS_PACKET_DONE; @@ -657,7 +634,7 @@ int ws_client_process_rx_ws(ws_client *client) return WS_CLIENT_CONNECTION_CLOSED; return WS_CLIENT_PARSING_DONE; default: - FATAL("Unknown parse state"); + nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown parse state"); return WS_CLIENT_INTERNAL_ERROR; } return 0; @@ -690,6 +667,8 @@ int ws_client_process(ws_client *client) case WS_CLIENT_CONNECTION_CLOSED: client->state = WS_CONN_CLOSED_GRACEFUL; break; + default: + break; } // if ret == 0 we can continue parsing // if ret == WS_CLIENT_PARSING_DONE we processed @@ -698,13 +677,13 @@ int ws_client_process(ws_client *client) } while (!ret || ret == WS_CLIENT_PARSING_DONE); break; case WS_ERROR: - ERROR("ws_client is in error state. Restart the connection!"); + nd_log(NDLS_DAEMON, NDLP_ERR, "ws_client is in error state. Restart the connection!"); return WS_CLIENT_PROTOCOL_ERROR; case WS_CONN_CLOSED_GRACEFUL: - ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); + nd_log(NDLS_DAEMON, NDLP_ERR, "Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again."); return WS_CLIENT_CONNECTION_CLOSED; default: - FATAL("Unknown connection state! Probably memory corruption."); + nd_log(NDLS_DAEMON, NDLP_CRIT, "Unknown connection state! Probably memory corruption."); return WS_CLIENT_INTERNAL_ERROR; } return ret; diff --git a/src/aclk/mqtt_websockets/ws_client.h b/src/aclk/mqtt_websockets/ws_client.h index 75e780697b802e..67e5835a257025 100644 --- a/src/aclk/mqtt_websockets/ws_client.h +++ b/src/aclk/mqtt_websockets/ws_client.h @@ -3,8 +3,6 @@ #ifndef WS_CLIENT_H #define WS_CLIENT_H -#include "mqtt_wss_log.h" - #define WS_CLIENT_NEED_MORE_BYTES 0x10 #define WS_CLIENT_PARSING_DONE 0x11 #define WS_CLIENT_CONNECTION_CLOSED 0x12 @@ -94,23 +92,20 @@ typedef struct websocket_client { // memory usage and remove one more memcpy buf_read->buf_to_mqtt rbuf_t buf_to_mqtt; // RAW data for MQTT lib - int entropy_fd; - // careful host is borrowed, don't free char **host; - mqtt_wss_log_ctx_t log; } ws_client; -ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log); +ws_client *ws_client_new(size_t buf_size, char **host); void ws_client_destroy(ws_client *client); void ws_client_reset(ws_client *client); int ws_client_start_handshake(ws_client *client); -int ws_client_want_write(ws_client *client); +int ws_client_want_write(const ws_client *client); int ws_client_process(ws_client *client); -int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size); +int ws_client_send(const ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size); #endif /* WS_CLIENT_H */ diff --git a/src/libnetdata/c_rhash/c_rhash.c b/src/libnetdata/c_rhash/c_rhash.c index 0ab25d5d427321..ec2c061a28240e 100644 --- a/src/libnetdata/c_rhash/c_rhash.c +++ b/src/libnetdata/c_rhash/c_rhash.c @@ -8,9 +8,6 @@ c_rhash c_rhash_new(size_t bin_count) { bin_count = 1000; c_rhash hash = callocz(1, sizeof(struct c_rhash_s) + (bin_count * sizeof(struct bin_ll*)) ); - if (hash == NULL) - return NULL; - hash->bin_count = bin_count; hash->bins = (c_rhash_bin *)((char*)hash + sizeof(struct c_rhash_s)); diff --git a/src/libnetdata/libnetdata.c b/src/libnetdata/libnetdata.c index 17dbd85428e4f5..84aa6e339f4f67 100644 --- a/src/libnetdata/libnetdata.c +++ b/src/libnetdata/libnetdata.c @@ -1570,52 +1570,93 @@ bool rrdr_relative_window_to_absolute_query(time_t *after, time_t *before, time_ return (absolute_period_requested != 1); } -int netdata_base64_decode(const char *encoded, char *decoded, size_t decoded_size) { - static const unsigned char base64_table[256] = { - ['A'] = 0, ['B'] = 1, ['C'] = 2, ['D'] = 3, ['E'] = 4, ['F'] = 5, ['G'] = 6, ['H'] = 7, - ['I'] = 8, ['J'] = 9, ['K'] = 10, ['L'] = 11, ['M'] = 12, ['N'] = 13, ['O'] = 14, ['P'] = 15, - ['Q'] = 16, ['R'] = 17, ['S'] = 18, ['T'] = 19, ['U'] = 20, ['V'] = 21, ['W'] = 22, ['X'] = 23, - ['Y'] = 24, ['Z'] = 25, ['a'] = 26, ['b'] = 27, ['c'] = 28, ['d'] = 29, ['e'] = 30, ['f'] = 31, - ['g'] = 32, ['h'] = 33, ['i'] = 34, ['j'] = 35, ['k'] = 36, ['l'] = 37, ['m'] = 38, ['n'] = 39, - ['o'] = 40, ['p'] = 41, ['q'] = 42, ['r'] = 43, ['s'] = 44, ['t'] = 45, ['u'] = 46, ['v'] = 47, - ['w'] = 48, ['x'] = 49, ['y'] = 50, ['z'] = 51, ['0'] = 52, ['1'] = 53, ['2'] = 54, ['3'] = 55, - ['4'] = 56, ['5'] = 57, ['6'] = 58, ['7'] = 59, ['8'] = 60, ['9'] = 61, ['+'] = 62, ['/'] = 63, - [0 ... '+' - 1] = 255, - ['+' + 1 ... '/' - 1] = 255, - ['9' + 1 ... 'A' - 1] = 255, - ['Z' + 1 ... 'a' - 1] = 255, - ['z' + 1 ... 255] = 255 - }; - size_t count = 0; - unsigned int tmp = 0; - int i, bit; - - if (decoded_size < 1) - return 0; // Buffer size must be at least 1 for null termination - - for (i = 0, bit = 0; encoded[i]; i++) { - unsigned char value = base64_table[(unsigned char)encoded[i]]; - if (value > 63) - return -1; // Invalid character in input - - tmp = tmp << 6 | value; - if (++bit == 4) { - if (count + 3 >= decoded_size) break; // Stop decoding if buffer is full - decoded[count++] = (tmp >> 16) & 0xFF; - decoded[count++] = (tmp >> 8) & 0xFF; - decoded[count++] = tmp & 0xFF; - tmp = 0; - bit = 0; - } - } +#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110 +static inline EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void) +{ + EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx)); - if (bit > 0 && count + 1 < decoded_size) { - tmp <<= 6 * (4 - bit); - if (bit > 2 && count + 1 < decoded_size) decoded[count++] = (tmp >> 16) & 0xFF; - if (bit > 3 && count + 1 < decoded_size) decoded[count++] = (tmp >> 8) & 0xFF; + if (ctx != NULL) { + memset(ctx, 0, sizeof(*ctx)); } + return ctx; +} + +static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx) +{ + OPENSSL_free(ctx); +} +#endif - decoded[count] = '\0'; // Null terminate the output string - return count; +int netdata_base64_decode(unsigned char *out, const unsigned char *in, const int in_len) +{ + int outl; + unsigned char remaining_data[256]; + + EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new(); + EVP_DecodeInit(ctx); + EVP_DecodeUpdate(ctx, out, &outl, in, in_len); + int remainder = 0; + EVP_DecodeFinal(ctx, remaining_data, &remainder); + EVP_ENCODE_CTX_free(ctx); + if (remainder) + return -1; + + return outl; } + +int netdata_base64_encode(unsigned char *encoded, const unsigned char *input, size_t input_size) +{ + return EVP_EncodeBlock(encoded, input, input_size); +} + +// Keep internal implementation +// int netdata_base64_decode_internal(const char *encoded, char *decoded, size_t decoded_size) { +// static const unsigned char base64_table[256] = { +// ['A'] = 0, ['B'] = 1, ['C'] = 2, ['D'] = 3, ['E'] = 4, ['F'] = 5, ['G'] = 6, ['H'] = 7, +// ['I'] = 8, ['J'] = 9, ['K'] = 10, ['L'] = 11, ['M'] = 12, ['N'] = 13, ['O'] = 14, ['P'] = 15, +// ['Q'] = 16, ['R'] = 17, ['S'] = 18, ['T'] = 19, ['U'] = 20, ['V'] = 21, ['W'] = 22, ['X'] = 23, +// ['Y'] = 24, ['Z'] = 25, ['a'] = 26, ['b'] = 27, ['c'] = 28, ['d'] = 29, ['e'] = 30, ['f'] = 31, +// ['g'] = 32, ['h'] = 33, ['i'] = 34, ['j'] = 35, ['k'] = 36, ['l'] = 37, ['m'] = 38, ['n'] = 39, +// ['o'] = 40, ['p'] = 41, ['q'] = 42, ['r'] = 43, ['s'] = 44, ['t'] = 45, ['u'] = 46, ['v'] = 47, +// ['w'] = 48, ['x'] = 49, ['y'] = 50, ['z'] = 51, ['0'] = 52, ['1'] = 53, ['2'] = 54, ['3'] = 55, +// ['4'] = 56, ['5'] = 57, ['6'] = 58, ['7'] = 59, ['8'] = 60, ['9'] = 61, ['+'] = 62, ['/'] = 63, +// [0 ... '+' - 1] = 255, +// ['+' + 1 ... '/' - 1] = 255, +// ['9' + 1 ... 'A' - 1] = 255, +// ['Z' + 1 ... 'a' - 1] = 255, +// ['z' + 1 ... 255] = 255 +// }; +// +// size_t count = 0; +// unsigned int tmp = 0; +// int i, bit; +// +// if (decoded_size < 1) +// return 0; // Buffer size must be at least 1 for null termination +// +// for (i = 0, bit = 0; encoded[i]; i++) { +// unsigned char value = base64_table[(unsigned char)encoded[i]]; +// if (value > 63) +// return -1; // Invalid character in input +// +// tmp = tmp << 6 | value; +// if (++bit == 4) { +// if (count + 3 >= decoded_size) break; // Stop decoding if buffer is full +// decoded[count++] = (tmp >> 16) & 0xFF; +// decoded[count++] = (tmp >> 8) & 0xFF; +// decoded[count++] = tmp & 0xFF; +// tmp = 0; +// bit = 0; +// } +// } +// +// if (bit > 0 && count + 1 < decoded_size) { +// tmp <<= 6 * (4 - bit); +// if (bit > 2 && count + 1 < decoded_size) decoded[count++] = (tmp >> 16) & 0xFF; +// if (bit > 3 && count + 1 < decoded_size) decoded[count++] = (tmp >> 8) & 0xFF; +// } +// +// decoded[count] = '\0'; // Null terminate the output string +// return count; +// } diff --git a/src/libnetdata/libnetdata.h b/src/libnetdata/libnetdata.h index 0963d63df68dee..d27d33e5a88959 100644 --- a/src/libnetdata/libnetdata.h +++ b/src/libnetdata/libnetdata.h @@ -642,7 +642,8 @@ extern bool unittest_running; bool rrdr_relative_window_to_absolute(time_t *after, time_t *before, time_t now); bool rrdr_relative_window_to_absolute_query(time_t *after, time_t *before, time_t *now_ptr, bool unittest); -int netdata_base64_decode(const char *encoded, char *decoded, size_t decoded_size); +int netdata_base64_decode(unsigned char *out, const unsigned char *in, int in_len); +int netdata_base64_encode(unsigned char *encoded, const unsigned char *input, size_t input_size); static inline void freez_charp(char **p) { freez(*p); diff --git a/src/libnetdata/ringbuffer/ringbuffer.c b/src/libnetdata/ringbuffer/ringbuffer.c index 5a3523962e1d8e..b30b3c39a2a349 100644 --- a/src/libnetdata/ringbuffer/ringbuffer.c +++ b/src/libnetdata/ringbuffer/ringbuffer.c @@ -6,9 +6,6 @@ rbuf_t rbuf_create(size_t size) { rbuf_t buffer = mallocz(sizeof(struct rbuf) + size); - if (!buffer) - return NULL; - memset(buffer, 0, sizeof(struct rbuf)); buffer->data = ((char*)buffer) + sizeof(struct rbuf); diff --git a/src/libnetdata/socket/security.h b/src/libnetdata/socket/security.h index c5c4d79c56beef..4e0b113cf58b07 100644 --- a/src/libnetdata/socket/security.h +++ b/src/libnetdata/socket/security.h @@ -19,6 +19,7 @@ typedef enum __attribute__((packed)) { #define OPENSSL_VERSION_300 0x30000000L # include +# include # include # include # include