Skip to content

Commit

Permalink
Make sure ACLK synchronization event loop runs frequently (netdata#19446
Browse files Browse the repository at this point in the history
)

* Process MAX_BATCH_SIZE commands before running the event loop again

* Increase command count
Make sure to exit and run event loop on sync command execution
  • Loading branch information
stelfrag authored Jan 20, 2025
1 parent e0d6ed7 commit 58467c6
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/database/sqlite/sqlite_aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ static void start_alert_push(uv_work_t *req __maybe_unused)

#define MAX_ACLK_BATCH_JOBS_IN_QUEUE (20)

#define MAX_BATCH_SIZE (64)

static void aclk_synchronization(void *arg)
{
struct aclk_sync_config_s *config = arg;
Expand All @@ -599,6 +601,8 @@ static void aclk_synchronization(void *arg)
worker_register_job_name(ACLK_QUERY_EXECUTE_SYNC, "aclk query execute sync");
worker_register_job_name(ACLK_QUERY_BATCH_EXECUTE, "aclk batch execute");
worker_register_job_name(ACLK_QUERY_BATCH_ADD, "aclk batch add");
worker_register_job_name(ACLK_MQTT_WSS_CLIENT, "config mqtt client");
worker_register_job_name(ACLK_DATABASE_NODE_UNREGISTER, "unregister node");

uv_loop_t *loop = &config->loop;
fatal_assert(0 == uv_loop_init(loop));
Expand All @@ -623,18 +627,24 @@ static void aclk_synchronization(void *arg)
Pvoid_t *Pvalue;
struct aclk_query_payload *payload;

unsigned cmd_batch_size;
while (likely(service_running(SERVICE_ACLK))) {
enum aclk_database_opcode opcode;
worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);

/* wait for commands */
cmd_batch_size = 0;
do {
if (unlikely(cmd_batch_size >= MAX_BATCH_SIZE))
break;

struct aclk_database_cmd cmd = aclk_database_deq_cmd();

if (unlikely(!service_running(SERVICE_ACLK)))
break;

++cmd_batch_size;
opcode = cmd.opcode;

if(likely(opcode != ACLK_DATABASE_NOOP && opcode != ACLK_QUERY_EXECUTE))
Expand Down Expand Up @@ -723,6 +733,7 @@ static void aclk_synchronization(void *arg)
aclk_run_query(config, query, false);
freez(payload);
config->aclk_queries_running--;
cmd_batch_size = MAX_BATCH_SIZE;
}
break;

Expand Down

0 comments on commit 58467c6

Please sign in to comment.