Skip to content

Commit

Permalink
Misc mqtt related code cleanup (netdata#18622)
Browse files Browse the repository at this point in the history
* Remove pthread locks / use spinlocks

* Remove redundant checks as mallocz, callocz can't return NULL

* Change logging
More code cleanup

* Change random number generation
Set Origin to empty

Use BCryptGenRandom

* More cleanup
Deduplicate base64_encode/decode

* Address review comments
  • Loading branch information
stelfrag authored Sep 29, 2024
1 parent 6d187c7 commit fbeee6b
Show file tree
Hide file tree
Showing 20 changed files with 546 additions and 1,016 deletions.
4 changes: 1 addition & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1555,8 +1555,6 @@ endif()
set(MQTT_WEBSOCKETS_FILES
src/aclk/mqtt_websockets/mqtt_wss_client.c
src/aclk/mqtt_websockets/mqtt_wss_client.h
src/aclk/mqtt_websockets/mqtt_wss_log.c
src/aclk/mqtt_websockets/mqtt_wss_log.h
src/aclk/mqtt_websockets/ws_client.c
src/aclk/mqtt_websockets/ws_client.h
src/aclk/mqtt_websockets/mqtt_ng.c
Expand Down Expand Up @@ -1721,7 +1719,7 @@ target_include_directories(libnetdata BEFORE PUBLIC ${CONFIG_H_DIR} ${CMAKE_SOUR
target_link_libraries(libnetdata PUBLIC
"$<$<NOT:$<BOOL:${HAVE_BUILTIN_ATOMICS}>>:atomic>"
"$<$<OR:$<BOOL:${OS_LINUX}>,$<BOOL:${OS_FREEBSD}>>:pthread;rt>"
"$<$<BOOL:${OS_WINDOWS}>:kernel32;advapi32;winmm;rpcrt4>"
"$<$<BOOL:${OS_WINDOWS}>:kernel32;advapi32;winmm;rpcrt4;bcrypt>"
"$<$<BOOL:${LINK_LIBM}>:m>"
"${SYSTEMD_LDFLAGS}")

Expand Down
40 changes: 8 additions & 32 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,30 +226,6 @@ static int wait_till_agent_claim_ready()
return 1;
}

void aclk_mqtt_wss_log_cb(mqtt_wss_log_type_t log_type, const char* str)
{
switch(log_type) {
case MQTT_WSS_LOG_ERROR:
case MQTT_WSS_LOG_FATAL:
nd_log(NDLS_DAEMON, NDLP_ERR, "%s", str);
return;

case MQTT_WSS_LOG_WARN:
nd_log(NDLS_DAEMON, NDLP_WARNING, "%s", str);
return;

case MQTT_WSS_LOG_INFO:
nd_log(NDLS_DAEMON, NDLP_INFO, "%s", str);
return;

case MQTT_WSS_LOG_DEBUG:
return;

default:
nd_log(NDLS_DAEMON, NDLP_ERR, "Unknown log type from mqtt_wss");
}
}

static void msg_callback(const char *topic, const void *msg, size_t msglen, int qos)
{
UNUSED(qos);
Expand Down Expand Up @@ -362,7 +338,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_rcvd_cloud_msgs = 0;
aclk_connection_counter++;

aclk_topic_cache_iter_t iter = ACLK_TOPIC_CACHE_ITER_T_INITIALIZER;
size_t iter = 0;
while ((topic = (char*)aclk_topic_cache_iterate(&iter)) != NULL)
mqtt_wss_set_topic_alias(client, topic);

Expand Down Expand Up @@ -768,7 +744,7 @@ static int aclk_attempt_to_connect(mqtt_wss_client client)
*/
void *aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct netdata_static_thread *static_thread = ptr;

ACLK_PROXY_TYPE proxy_type;
aclk_get_proxy(&proxy_type);
Expand All @@ -783,7 +759,7 @@ void *aclk_main(void *ptr)
if (wait_till_agent_claim_ready())
goto exit;

if (!(mqttwss_client = mqtt_wss_new("mqtt_wss", aclk_mqtt_wss_log_cb, msg_callback, puback_callback))) {
if (!((mqttwss_client = mqtt_wss_new(msg_callback, puback_callback)))) {
netdata_log_error("Couldn't initialize MQTT_WSS network library");
goto exit;
}
Expand Down Expand Up @@ -1025,22 +1001,22 @@ char *aclk_state(void)
}

buffer_sprintf(wb, "Online: %s\nReconnect count: %d\nBanned By Cloud: %s\n", aclk_online() ? "Yes" : "No", aclk_connection_counter > 0 ? (aclk_connection_counter - 1) : 0, aclk_disable_runtime ? "Yes" : "No");
if (last_conn_time_mqtt && (tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf)) ) {
if (last_conn_time_mqtt && ((tmptr = localtime_r(&last_conn_time_mqtt, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Connection Time: %s\n", timebuf);
}
if (last_conn_time_appl && (tmptr = localtime_r(&last_conn_time_appl, &tmbuf)) ) {
if (last_conn_time_appl && ((tmptr = localtime_r(&last_conn_time_appl, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Connection Time + %d PUBACKs received: %s\n", ACLK_PUBACKS_CONN_STABLE, timebuf);
}
if (last_disconnect_time && (tmptr = localtime_r(&last_disconnect_time, &tmbuf)) ) {
if (last_disconnect_time && ((tmptr = localtime_r(&last_disconnect_time, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Last Disconnect Time: %s\n", timebuf);
}
if (!aclk_connected && next_connection_attempt && (tmptr = localtime_r(&next_connection_attempt, &tmbuf)) ) {
if (!aclk_connected && next_connection_attempt && ((tmptr = localtime_r(&next_connection_attempt, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
buffer_sprintf(wb, "Next Connection Attempt At: %s\nLast Backoff: %.3f", timebuf, last_backoff_value);
Expand Down Expand Up @@ -1107,7 +1083,7 @@ static void fill_alert_status_for_host_json(json_object *obj, RRDHOST *host)
static json_object *timestamp_to_json(const time_t *t)
{
struct tm *tmptr, tmbuf;
if (*t && (tmptr = gmtime_r(t, &tmbuf)) ) {
if (*t && ((tmptr = gmtime_r(t, &tmbuf))) ) {
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tmptr);
return json_object_new_string(timebuf);
Expand Down
42 changes: 5 additions & 37 deletions src/aclk/aclk_otp.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,40 +267,8 @@ static int aclk_parse_otp_error(const char *json_str) {
}
#endif

#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
{
EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));

if (ctx != NULL) {
memset(ctx, 0, sizeof(*ctx));
}
return ctx;
}
static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
{
OPENSSL_free(ctx);
return;
}
#endif

#define CHALLENGE_LEN 256
#define CHALLENGE_LEN_BASE64 344
inline static int base64_decode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
{
unsigned char remaining_data[CHALLENGE_LEN];
EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
EVP_DecodeInit(ctx);
EVP_DecodeUpdate(ctx, out, outl, in, in_len);
int remainder = 0;
EVP_DecodeFinal(ctx, remaining_data, &remainder);
EVP_ENCODE_CTX_free(ctx);
if (remainder) {
netdata_log_error("Unexpected data at EVP_DecodeFinal");
return 1;
}
return 0;
}

#define OTP_URL_PREFIX "/api/v1/auth/node/"
int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **challenge, int *challenge_bytes, bool *fallback_ipv4)
Expand Down Expand Up @@ -347,7 +315,7 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **
goto cleanup_json;
}
const char *challenge_base64;
if (!(challenge_base64 = json_object_get_string(challenge_json))) {
if (!((challenge_base64 = json_object_get_string(challenge_json)))) {
netdata_log_error("Failed to extract challenge from JSON object");
goto cleanup_json;
}
Expand All @@ -356,8 +324,9 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **
goto cleanup_json;
}

*challenge = mallocz((CHALLENGE_LEN_BASE64 / 4) * 3);
base64_decode_helper(*challenge, challenge_bytes, (const unsigned char*)challenge_base64, strlen(challenge_base64));
*challenge = mallocz(CHALLENGE_LEN);
*challenge_bytes = netdata_base64_decode(*challenge, (const unsigned char *) challenge_base64, CHALLENGE_LEN_BASE64);

if (*challenge_bytes != CHALLENGE_LEN) {
netdata_log_error("Unexpected challenge length of %d instead of %d", *challenge_bytes, CHALLENGE_LEN);
freez(*challenge);
Expand All @@ -375,7 +344,6 @@ int aclk_get_otp_challenge(url_t *target, const char *agent_id, unsigned char **

int aclk_send_otp_response(const char *agent_id, const unsigned char *response, int response_bytes, url_t *target, struct auth_data *mqtt_auth, bool *fallback_ipv4)
{
int len;
int rc = 1;
https_req_t req = HTTPS_REQ_T_INITIALIZER;
https_req_response_t resp = HTTPS_REQ_RESPONSE_T_INITIALIZER;
Expand All @@ -387,7 +355,7 @@ int aclk_send_otp_response(const char *agent_id, const unsigned char *response,
unsigned char base64[CHALLENGE_LEN_BASE64 + 1];
memset(base64, 0, CHALLENGE_LEN_BASE64 + 1);

base64_encode_helper(base64, &len, response, response_bytes);
(void) netdata_base64_encode(base64, response, response_bytes);

BUFFER *url = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk);
BUFFER *resp_json = buffer_create(strlen(OTP_URL_PREFIX) + UUID_STR_LEN + 20, &netdata_buffers_statistics.buffers_aclk);
Expand Down
2 changes: 1 addition & 1 deletion src/aclk/aclk_tx_msgs.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static short aclk_send_message_with_bin_payload(mqtt_wss_client client, json_obj

int rc = mqtt_wss_publish5(client, (char*)topic, NULL, full_msg, &freez_aclk_publish5b, full_msg_len, MQTT_WSS_PUB_QOS1, &packet_id);

if (rc == MQTT_WSS_ERR_TOO_BIG_FOR_SERVER)
if (rc == MQTT_WSS_ERR_MSG_TOO_BIG)
return HTTP_RESP_CONTENT_TOO_LONG;

return 0;
Expand Down
43 changes: 1 addition & 42 deletions src/aclk/aclk_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ const char *aclk_get_topic(enum aclk_topics topic)
* having to resort to callbacks.
*/

const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter)
const char *aclk_topic_cache_iterate(size_t *iter)
{
if (!aclk_topic_cache) {
netdata_log_error("Topic cache not initialized when %s was called.", __FUNCTION__);
Expand Down Expand Up @@ -434,44 +434,3 @@ void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt

freez(proxy);
}

#if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
{
EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));

if (ctx != NULL) {
memset(ctx, 0, sizeof(*ctx));
}
return ctx;
}
static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
{
OPENSSL_free(ctx);
return;
}
#endif

int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
{
int len;
unsigned char *str = out;
EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
EVP_EncodeInit(ctx);
EVP_EncodeUpdate(ctx, str, outl, in, in_len);
str += *outl;
EVP_EncodeFinal(ctx, str, &len);
*outl += len;

str = out;
while(*str) {
if (*str != 0x0D && *str != 0x0A)
*out++ = *str++;
else
str++;
}
*out = 0;

EVP_ENCODE_CTX_free(ctx);
return 0;
}
11 changes: 2 additions & 9 deletions src/aclk/aclk_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,10 @@ enum aclk_topics {
ACLK_TOPICID_CTXS_UPDATED = 20
};

typedef size_t aclk_topic_cache_iter_t;
#define ACLK_TOPIC_CACHE_ITER_T_INITIALIZER (0)

const char *aclk_get_topic(enum aclk_topics topic);
int aclk_generate_topic_cache(struct json_object *json);
int aclk_generate_topic_cache(json_object *json);
void free_topic_cache(void);
const char *aclk_topic_cache_iterate(aclk_topic_cache_iter_t *iter);
// TODO
// aclk_topics_reload //when claim id changes
const char *aclk_topic_cache_iterate(size_t *iter);

#ifdef ACLK_LOG_CONVERSATION_DIR
extern volatile int aclk_conversation_log_counter;
Expand All @@ -113,6 +108,4 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un

void aclk_set_proxy(char **ohost, int *port, char **uname, char **pwd, enum mqtt_wss_proxy_type *type);

int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len);

#endif /* ACLK_UTIL_H */
3 changes: 1 addition & 2 deletions src/aclk/https_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ static int handle_http_request(https_req_ctx_t *ctx) {
// we remove those but during encoding we need that space in the buffer
creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n");
char *creds_base64 = callocz(1, creds_base64_len + 1);
base64_encode_helper((unsigned char*)creds_base64, &creds_base64_len, (unsigned char*)creds_plain, creds_plain_len);
(void) netdata_base64_encode((unsigned char *)creds_base64, (unsigned char *)creds_plain, creds_plain_len);
buffer_sprintf(hdr, "Proxy-Authorization: Basic %s\x0D\x0A", creds_base64);
freez(creds_plain);
}
Expand Down Expand Up @@ -584,7 +584,6 @@ static int handle_http_request(https_req_ctx_t *ctx) {
if (ctx->parse_ctx.chunked_response)
freez(ctx->parse_ctx.chunked_response);
rc = 4;
goto err_exit;
}

err_exit:
Expand Down
Loading

0 comments on commit fbeee6b

Please sign in to comment.