From 58467c686c213017a6aa001c6e959ce92b6c93fa Mon Sep 17 00:00:00 2001 From: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Date: Mon, 20 Jan 2025 20:22:04 +0200 Subject: [PATCH] Make sure ACLK synchronization event loop runs frequently (#19446) * Process MAX_BATCH_SIZE commands before running the event loop again * Increase command count Make sure to exit and run event loop on sync command execution --- src/database/sqlite/sqlite_aclk.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/database/sqlite/sqlite_aclk.c b/src/database/sqlite/sqlite_aclk.c index 1514c07eacb380..0c8cb5fbbd24b4 100644 --- a/src/database/sqlite/sqlite_aclk.c +++ b/src/database/sqlite/sqlite_aclk.c @@ -585,6 +585,8 @@ static void start_alert_push(uv_work_t *req __maybe_unused) #define MAX_ACLK_BATCH_JOBS_IN_QUEUE (20) +#define MAX_BATCH_SIZE (64) + static void aclk_synchronization(void *arg) { struct aclk_sync_config_s *config = arg; @@ -599,6 +601,8 @@ static void aclk_synchronization(void *arg) worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "aclk query execute sync"); worker_register_job_name(ACLK_QUERY_BATCH_EXECUTE, "aclk batch execute"); worker_register_job_name(ACLK_QUERY_BATCH_ADD, "aclk batch add"); + worker_register_job_name(ACLK_MQTT_WSS_CLIENT, "config mqtt client"); + worker_register_job_name(ACLK_DATABASE_NODE_UNREGISTER, "unregister node"); uv_loop_t *loop = &config->loop; fatal_assert(0 == uv_loop_init(loop)); @@ -623,18 +627,24 @@ static void aclk_synchronization(void *arg) Pvoid_t *Pvalue; struct aclk_query_payload *payload; + unsigned cmd_batch_size; while (likely(service_running(SERVICE_ACLK))) { enum aclk_database_opcode opcode; worker_is_idle(); uv_run(loop, UV_RUN_DEFAULT); /* wait for commands */ + cmd_batch_size = 0; do { + if (unlikely(cmd_batch_size >= MAX_BATCH_SIZE)) + break; + struct aclk_database_cmd cmd = aclk_database_deq_cmd(); if (unlikely(!service_running(SERVICE_ACLK))) break; + ++cmd_batch_size; opcode = cmd.opcode; if(likely(opcode != ACLK_DATABASE_NOOP && opcode != ACLK_QUERY_EXECUTE)) @@ -723,6 +733,7 @@ static void aclk_synchronization(void *arg) aclk_run_query(config, query, false); freez(payload); config->aclk_queries_running--; + cmd_batch_size = MAX_BATCH_SIZE; } break;