Skip to content

Commit

Permalink
Improve context load time during startup (netdata#19321)
Browse files Browse the repository at this point in the history
* Improve context load time during startup

* Remove cache for instance acquired

---------

Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
  • Loading branch information
stelfrag and ktsaou authored Jan 10, 2025
1 parent be3a1af commit 21d3e3a
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 80 deletions.
23 changes: 13 additions & 10 deletions src/database/sqlite/sqlite_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "sqlite_functions.h"
#include "sqlite_context.h"
#include "sqlite_db_migration.h"
#include "database/contexts/internal.h"

#define DB_CONTEXT_METADATA_VERSION 1

Expand Down Expand Up @@ -72,6 +73,8 @@ int sql_init_context_database(int memory)
return 0;
}

extern __thread sqlite3 *db_meta_thread;
extern __thread sqlite3 *db_context_thread;
//
// Fetching data
//
Expand All @@ -80,14 +83,14 @@ int sql_init_context_database(int memory)

void ctx_get_chart_list(nd_uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *, void *), void *data)
{
static __thread sqlite3_stmt *res = NULL;
sqlite3_stmt *res = NULL;

if (unlikely(!host_uuid)) {
internal_error(true, "Requesting context chart list without host_id");
return;
}

if (!PREPARE_COMPILED_STATEMENT(db_meta, CTX_GET_CHART_LIST, &res))
if (!PREPARE_STATEMENT(db_meta_thread ? db_meta_thread : db_meta, CTX_GET_CHART_LIST, &res))
return;

int param = 0;
Expand All @@ -111,17 +114,17 @@ void ctx_get_chart_list(nd_uuid_t *host_uuid, void (*dict_cb)(SQL_CHART_DATA *,

done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
SQLITE_FINALIZE(res);
}

// Dimension list
#define CTX_GET_DIMENSION_LIST "SELECT d.dim_id, d.id, d.name, CASE WHEN INSTR(d.options,\"hidden\") > 0 THEN 1 ELSE 0 END, c.type||'.'||c.id, c.context " \
"FROM dimension d, chart c WHERE c.host_id = @host_id AND d.chart_id = c.chart_id AND d.dim_id IS NOT NULL ORDER BY d.rowid ASC"
void ctx_get_dimension_list(nd_uuid_t *host_uuid, void (*dict_cb)(SQL_DIMENSION_DATA *, void *), void *data)
{
static __thread sqlite3_stmt *res = NULL;
sqlite3_stmt *res = NULL;

if (!PREPARE_COMPILED_STATEMENT(db_meta, CTX_GET_DIMENSION_LIST, &res))
if (!PREPARE_STATEMENT(db_meta_thread ? db_meta_thread : db_meta, CTX_GET_DIMENSION_LIST, &res))
return;

int param = 0;
Expand All @@ -142,7 +145,7 @@ void ctx_get_dimension_list(nd_uuid_t *host_uuid, void (*dict_cb)(SQL_DIMENSION_

done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
SQLITE_FINALIZE(res);
}

// LABEL LIST
Expand Down Expand Up @@ -183,9 +186,9 @@ void ctx_get_context_list(nd_uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEX
if (unlikely(!host_uuid))
return;

static __thread sqlite3_stmt *res = NULL;
sqlite3_stmt *res = NULL;

if (!PREPARE_COMPILED_STATEMENT(db_context_meta, CTX_GET_CONTEXT_LIST, &res))
if (!PREPARE_STATEMENT(db_context_thread ? db_context_thread : db_context_meta, CTX_GET_CONTEXT_LIST, &res))
return;

VERSIONED_CONTEXT_DATA context_data = {0};
Expand All @@ -210,7 +213,7 @@ void ctx_get_context_list(nd_uuid_t *host_uuid, void (*dict_cb)(VERSIONED_CONTEX

done:
REPORT_BIND_FAIL(res, param);
SQLITE_RESET(res);
SQLITE_FINALIZE(res);
}


Expand All @@ -230,7 +233,7 @@ int ctx_store_context(nd_uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data
if (unlikely(!host_uuid || !context_data || !context_data->id))
return 0;

if (!PREPARE_STATEMENT(db_context_meta, CTX_STORE_CONTEXT, &res))
if (!PREPARE_STATEMENT(db_context_meta ? db_context_meta : db_meta, CTX_STORE_CONTEXT, &res))
return 1;

int param = 0;
Expand Down
202 changes: 132 additions & 70 deletions src/database/sqlite/sqlite_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1571,29 +1571,66 @@ struct scan_metadata_payload {
struct host_context_load_thread {
uv_thread_t thread;
RRDHOST *host;
sqlite3 *db_meta_thread;
sqlite3 *db_context_thread;
bool busy;
bool finished;
};

__thread sqlite3 *db_meta_thread = NULL;
__thread sqlite3 *db_context_thread = NULL;
__thread bool main_context_thread = false;

static void restore_host_context(void *arg)
{
struct host_context_load_thread *hclt = arg;
RRDHOST *host = hclt->host;

if (!host)
return;

if (!db_meta_thread) {
if (hclt->db_meta_thread) {
db_meta_thread = hclt->db_meta_thread;
db_context_thread = hclt->db_context_thread;
} else {
char sqlite_database[FILENAME_MAX + 1];
snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/netdata-meta.db", netdata_configured_cache_dir);
int rc = sqlite3_open_v2(sqlite_database, &db_meta_thread, SQLITE_OPEN_READONLY | SQLITE_OPEN_NOMUTEX, NULL);
if (rc != SQLITE_OK) {
sqlite3_close(db_meta_thread);
db_meta_thread = NULL;
}

snprintfz(sqlite_database, sizeof(sqlite_database) - 1, "%s/context-meta.db", netdata_configured_cache_dir);
rc = sqlite3_open_v2(sqlite_database, &db_context_thread, SQLITE_OPEN_READONLY | SQLITE_OPEN_NOMUTEX, NULL);
if (rc != SQLITE_OK) {
sqlite3_close(db_context_thread);
db_context_thread = NULL;
}

hclt->db_meta_thread = db_meta_thread;
hclt->db_context_thread = db_context_thread;
}
}

usec_t started_ut = now_monotonic_usec(); (void)started_ut;
rrdhost_load_rrdcontext_data(host);
usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;

char load_duration[64];
duration_snprintf(load_duration, sizeof(load_duration), (int64_t)(ended_ut - started_ut), "us", true);
nd_log_daemon(NDLP_DEBUG, "Contexts for host %s loaded in %s", rrdhost_hostname(host), load_duration);

rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD);

aclk_queue_node_info(host, false);

nd_log(
NDLS_DAEMON,
NDLP_DEBUG,
"Contexts for host %s loaded in %0.2f ms",
rrdhost_hostname(host),
(double)(ended_ut - started_ut) / USEC_PER_MS);
// Check and clear the thread local variables
if (!main_context_thread) {
db_meta_thread = NULL;
db_context_thread = NULL;
}

__atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
}
Expand All @@ -1605,40 +1642,41 @@ static void after_start_host_load_context(uv_work_t *req, int status __maybe_unu
freez(data);
}

#define MAX_FIND_THREAD_RETRIES (10)

static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
static bool cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait, size_t *free_slot)
{
if (!hclt)
return;
return false;

for (size_t index = 0; index < max_thread_slots; index++) {
if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
|| (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
int rc = uv_thread_join(&(hclt[index].thread));
if (rc)
nd_log(NDLS_DAEMON, NDLP_WARNING, "Failed to join thread, rc = %d", rc);
__atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
__atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
}
}
}
bool found_slot = false;

static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index)
{
size_t retries = MAX_FIND_THREAD_RETRIES;
while (retries--) {
size_t index = 0;
while (index < max_thread_slots) {
if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
*found_index = index;
return true;
}
index++;
}
sleep_usec(10 * USEC_PER_MS);
size_t loop_count = 20;
while (loop_count--) {
for (size_t index = 0; index < max_thread_slots; index++) {
if (free_slot && false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
found_slot = true;
*free_slot = index;
break;
}
if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED) ||
(wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {

int rc = uv_thread_join(&(hclt[index].thread));
if (rc)
nd_log_daemon(NDLP_WARNING, "Failed to join thread, rc = %d", rc);
__atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
__atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
found_slot = true;
if (free_slot) {
*free_slot = index;
break;
}
}
}
if (found_slot || wait)
break;
sleep_usec(10 * USEC_PER_MS);
}
return false;
return found_slot || wait;
}

static void start_all_host_load_context(uv_work_t *req __maybe_unused)
Expand All @@ -1653,53 +1691,77 @@ static void start_all_host_load_context(uv_work_t *req __maybe_unused)

RRDHOST *host;

size_t max_threads = netdata_conf_cpus() / 2;
size_t max_threads = netdata_conf_cpus();
if (max_threads < 1)
max_threads = 1;

nd_log(NDLS_DAEMON, NDLP_DEBUG, "Using %zu threads for context loading", max_threads);
struct host_context_load_thread *hclt = max_threads > 1 ? callocz(max_threads, sizeof(*hclt)) : NULL;

size_t thread_index = 0;
main_context_thread = true;
size_t host_count = 0;
size_t sync_exec = 0;
size_t async_exec = 0;
dfe_start_reentrant(rrdhost_root_index, host) {
if (!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
continue;

nd_log(NDLS_DAEMON, NDLP_DEBUG, "Loading context for host %s", rrdhost_hostname(host));

int rc = 0;
if (hclt) {
bool found_slot = false;
do {
if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
break;

cleanup_finished_threads(hclt, max_threads, false);
found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
} while (!found_slot);

if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
break;

__atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
hclt[thread_index].host = host;
rc = uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]);
}
// if single thread or thread creation failed
if (rc || !hclt) {
struct host_context_load_thread hclt_sync = {.host = host};
restore_host_context(&hclt_sync);

if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
break;
}
if (!rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
continue;

if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))
break;

nd_log_daemon(NDLP_DEBUG, "Loading context for host %s", rrdhost_hostname(host));

int rc = 0;
bool thread_found = cleanup_finished_threads(hclt, max_threads, false, &thread_index);
if (thread_found) {
__atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
hclt[thread_index].host = host;
rc = uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]);
async_exec += (rc == 0);
}
// if single thread, thread creation failure or failure to find slot
if (rc || !thread_found) {
sync_exec++;
struct host_context_load_thread hclt_sync = {.host = host};
restore_host_context(&hclt_sync);
}
host_count++;
}
dfe_done(host);

cleanup_finished_threads(hclt, max_threads, true);
freez(hclt);
bool should_clean_threads = cleanup_finished_threads(hclt, max_threads, true, NULL);

if (should_clean_threads) {
for (size_t index = 0; index < max_threads; index++) {
if (hclt[index].db_meta_thread)
sqlite3_close_v2(hclt[index].db_meta_thread);

if (hclt[index].db_context_thread)
sqlite3_close_v2(hclt[index].db_context_thread);
}
freez(hclt);
}

usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
nd_log(NDLS_DAEMON, NDLP_DEBUG, "Host contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
char load_duration[64];
duration_snprintf(load_duration, sizeof(load_duration), (int64_t)(ended_ut - started_ut), "us", true);

nd_log_daemon(
NDLP_INFO,
"Contexts for %zu hosts loaded: %zu delegated to %zu threads, %zu handled directly, in %s.",
host_count,
async_exec,
max_threads,
sync_exec,
load_duration);

if (db_meta_thread) {
sqlite3_close_v2(db_meta_thread);
sqlite3_close_v2(db_context_thread);
db_meta_thread = NULL;
db_context_thread = NULL;
}

worker_is_idle();
}
Expand Down

0 comments on commit 21d3e3a

Please sign in to comment.