Skip to content

Commit

Permalink
[#24472] YSQL, QueryDiagnostics: Update pg_stat_statements.csv file b…
Browse files Browse the repository at this point in the history
…ased on pg15 changes.

Summary:
In PostgreSQL 15, several new columns were introduced to the `pg_stat_statements` view, including separation of timing information into `plan_time` and `exec_time`. The changes in this diff integrate these new columns into the `query_diagnostics` bundle.
Jira: DB-13382

Test Plan: ./yb_build.sh --java-test 'org.yb.pgsql.TestYbQueryDiagnostics#checkPgssData'

Reviewers: asaha

Reviewed By: asaha

Subscribers: svc_phabricator, yql

Differential Revision: https://phorge.dev.yugabyte.com/D40319
  • Loading branch information
IshanChhangani committed Feb 24, 2025
1 parent c7ae857 commit f3c40f8
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,18 +655,34 @@ private void validatePgssData(Path pgssPath, String queryId, int noOfCalls,
String[] tokens = generateTokensForPgssData(pgssPath);
validatePgssData(pgssPath, queryId, noOfCalls, tokens);

Float actualTotalPlanTime = Float.parseFloat(tokens[3]);
Float actualTotalExecTime = Float.parseFloat(tokens[4]);
Float actualTotalTime = actualTotalPlanTime + actualTotalExecTime;

Float actualMinPlanTime = Float.parseFloat(tokens[5]);
Float actualMinExecTime = Float.parseFloat(tokens[6]);
Float actualMinTime = actualMinPlanTime + actualMinExecTime;

Float actualMaxPlanTime = Float.parseFloat(tokens[7]);
Float actualMaxExecTime = Float.parseFloat(tokens[8]);
Float actualMaxTime = actualMaxPlanTime + actualMaxExecTime;

Float actualMeanPlanTime = Float.parseFloat(tokens[9]);
Float actualMeanExecTime = Float.parseFloat(tokens[10]);
Float actualMeanTime = actualMeanPlanTime + actualMeanExecTime;

if (unnecessaryString != null)
assertTrue("pg_stat_statements contains unnecessary data",
!tokens[1].contains(unnecessaryString));
/* pg_stat_statements outputs data in ms */
assertLessThan("total_time is incorrect",
Math.abs(Float.parseFloat(tokens[3]) - expectedTotalTimeMs), epsilonMs);
Math.abs(actualTotalTime - expectedTotalTimeMs), epsilonMs);
assertLessThan("min_time is incorrect",
Math.abs(Float.parseFloat(tokens[4]) - expectedMinTimeMs), epsilonMs);
Math.abs(actualMinTime - expectedMinTimeMs), epsilonMs);
assertLessThan("max_time is incorrect",
Math.abs(Float.parseFloat(tokens[5]) - expectedMaxTimeMs), epsilonMs);
Math.abs(actualMaxTime - expectedMaxTimeMs), epsilonMs);
assertLessThan("mean_time is incorrect",
Math.abs(Float.parseFloat(tokens[6]) - expectedMeanTimeMs), epsilonMs);
Math.abs(actualMeanTime - expectedMeanTimeMs), epsilonMs);
}

private void validateConstantsOrBindVarData(Path bindVarPath, int noOfConstantsPerLine,
Expand Down
4 changes: 4 additions & 0 deletions src/postgres/contrib/pg_stat_statements/pg_stat_statements.c
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,10 @@ pgss_store(const char *query, uint64 queryId,
YbGetRedactedQueryString(query, query_len, &redacted_query, &redacted_query_len);
#endif

if (yb_enable_query_diagnostics && !jstate)
YbQueryDiagnosticsAccumulatePgss(queryId, (YbQdPgssStoreKind) kind,
total_time, rows, bufusage, walusage, jitusage);

/* Set up key for hashtable search */

/* memset() is required when pgssHashKey is without padding only */
Expand Down
218 changes: 151 additions & 67 deletions src/postgres/src/backend/utils/misc/yb_query_diagnostics.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ static void AccumulateBindVariablesOrConstants(YbQueryDiagnosticsEntry *entry, S
/* Functions used in gathering pg_stat_statements */
static void PgssToString(int64 query_id, char *pgss_str, YbQueryDiagnosticsPgss pgss,
const char *queryString);
static void AccumulatePgss(QueryDesc *queryDesc, YbQueryDiagnosticsEntry *result);

/* Functions used in gathering schema details */
static void FetchSchemaOids(List *rtable, Oid *schema_oids);
static bool ExecuteQuery(StringInfo schema_details, const char *query,
const char *title, char *description);
Expand Down Expand Up @@ -781,9 +778,12 @@ YbQueryDiagnostics_ExecutorEnd(QueryDesc *queryDesc)

if (entry)
{
double totaltime_ms;

totaltime_ms = INSTR_TIME_GET_MILLISEC(queryDesc->totaltime->counter);
/*
* Make sure stats accumulation is done. (Note: it's okay if several
* levels of hook all do this.)
*/
InstrEndLoop(queryDesc->totaltime);
double totaltime_ms = queryDesc->totaltime->total * 1000.0;

if (entry->metadata.params.bind_var_query_min_duration_ms <= totaltime_ms &&
(queryDesc->params || query_constants.count > 0))
Expand Down Expand Up @@ -819,8 +819,6 @@ YbQueryDiagnostics_ExecutorEnd(QueryDesc *queryDesc)
pfree(buf.data);
}

AccumulatePgss(queryDesc, entry);

if (current_query_sampled)
AccumulateExplain(queryDesc, entry,
entry->metadata.params.explain_analyze,
Expand Down Expand Up @@ -861,58 +859,102 @@ AccumulateBindVariablesOrConstants(YbQueryDiagnosticsEntry *entry, StringInfo pa
SpinLockRelease(&entry->mutex);
}

static void
AccumulatePgss(QueryDesc *queryDesc, YbQueryDiagnosticsEntry *entry)
void
YbQueryDiagnosticsAccumulatePgss(int64 query_id, YbQdPgssStoreKind kind,
double total_time, uint64 rows,
const BufferUsage *bufusage,
const WalUsage *walusage,
const struct JitInstrumentation *jitusage)
{
double totaltime_ms = INSTR_TIME_GET_DOUBLE(queryDesc->totaltime->counter) * 1000;
int64 rows = queryDesc->estate->es_processed;
BufferUsage *bufusage = &queryDesc->totaltime->bufusage;
YbQueryDiagnosticsEntry *entry;

SpinLockAcquire(&entry->mutex);
entry->pgss.counters.calls++;
entry->pgss.counters.total_time += totaltime_ms;
entry->pgss.counters.rows += queryDesc->estate->es_processed;
LWLockAcquire(bundles_in_progress_lock, LW_SHARED);
/*
* This can slow down the query execution, even if the query is not being bundled.
* Worst case : O(QUERY_DIAGNOSTICS_HASH_MAX_SIZE)
*/
entry = (YbQueryDiagnosticsEntry *) hash_search(bundles_in_progress,
&query_id, HASH_FIND,
NULL);

if (entry->pgss.counters.calls == 1)
{
entry->pgss.counters.min_time = totaltime_ms;
entry->pgss.counters.max_time = totaltime_ms;
entry->pgss.counters.mean_time = totaltime_ms;
}
else
if (entry)
{
double old_mean = entry->pgss.counters.mean_time;
SpinLockAcquire(&entry->mutex);
entry->pgss.counters.calls[kind] += 1;
entry->pgss.counters.total_time[kind] += total_time;
entry->pgss.counters.rows += rows;

/*
* 'calls' cannot be 0 here because
* it is initialized to 0 and incremented by calls++ above
*/
entry->pgss.counters.mean_time += (totaltime_ms - old_mean) / entry->pgss.counters.calls;
entry->pgss.counters.sum_var_time +=
((totaltime_ms - old_mean) *
(totaltime_ms - entry->pgss.counters.mean_time));
if (entry->pgss.counters.min_time > totaltime_ms)
entry->pgss.counters.min_time = totaltime_ms;
if (entry->pgss.counters.max_time < totaltime_ms)
entry->pgss.counters.max_time = totaltime_ms;
if (entry->pgss.counters.calls[kind] == 1)
{
entry->pgss.counters.min_time[kind] = total_time;
entry->pgss.counters.max_time[kind] = total_time;
entry->pgss.counters.mean_time[kind] = total_time;
}
else
{
double old_mean = entry->pgss.counters.mean_time[kind];
/*
* 'calls' cannot be 0 here because
* it is initialized to 0 and incremented by calls++ above
*/
entry->pgss.counters.mean_time[kind] +=
(total_time - old_mean) / entry->pgss.counters.calls[kind];
entry->pgss.counters.sum_var_time[kind] +=
(total_time - old_mean) * (total_time - entry->pgss.counters.mean_time[kind]);
if (entry->pgss.counters.min_time[kind] > total_time)
entry->pgss.counters.min_time[kind] = total_time;
if (entry->pgss.counters.max_time[kind] < total_time)
entry->pgss.counters.max_time[kind] = total_time;
}

entry->pgss.counters.rows += rows;
entry->pgss.counters.shared_blks_hit += bufusage->shared_blks_hit;
entry->pgss.counters.shared_blks_read += bufusage->shared_blks_read;
entry->pgss.counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
entry->pgss.counters.shared_blks_written += bufusage->shared_blks_written;
entry->pgss.counters.local_blks_hit += bufusage->local_blks_hit;
entry->pgss.counters.local_blks_read += bufusage->local_blks_read;
entry->pgss.counters.local_blks_dirtied += bufusage->local_blks_dirtied;
entry->pgss.counters.local_blks_written += bufusage->local_blks_written;
entry->pgss.counters.temp_blks_read += bufusage->temp_blks_read;
entry->pgss.counters.temp_blks_written += bufusage->temp_blks_written;
entry->pgss.counters.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time);
entry->pgss.counters.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time);
entry->pgss.counters.wal_records += walusage->wal_records;
entry->pgss.counters.wal_fpi += walusage->wal_fpi;
entry->pgss.counters.wal_bytes += walusage->wal_bytes;

if (jitusage)
{
entry->pgss.counters.jit_functions += jitusage->created_functions;
entry->pgss.counters.jit_generation_time += INSTR_TIME_GET_MILLISEC(jitusage->generation_counter);

if (INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter))
entry->pgss.counters.jit_inlining_count++;
entry->pgss.counters.jit_inlining_time += INSTR_TIME_GET_MILLISEC(jitusage->inlining_counter);

if (INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter))
entry->pgss.counters.jit_optimization_count++;
entry->pgss.counters.jit_optimization_time += INSTR_TIME_GET_MILLISEC(jitusage->optimization_counter);

if (INSTR_TIME_GET_MILLISEC(jitusage->emission_counter))
entry->pgss.counters.jit_emission_count++;
entry->pgss.counters.jit_emission_time += INSTR_TIME_GET_MILLISEC(jitusage->emission_counter);
}

SpinLockRelease(&entry->mutex);
}

entry->pgss.counters.rows += rows;
entry->pgss.counters.shared_blks_hit += bufusage->shared_blks_hit;
entry->pgss.counters.shared_blks_read += bufusage->shared_blks_read;
entry->pgss.counters.shared_blks_dirtied += bufusage->shared_blks_dirtied;
entry->pgss.counters.shared_blks_written += bufusage->shared_blks_written;
entry->pgss.counters.local_blks_hit += bufusage->local_blks_hit;
entry->pgss.counters.local_blks_read += bufusage->local_blks_read;
entry->pgss.counters.local_blks_dirtied += bufusage->local_blks_dirtied;
entry->pgss.counters.local_blks_written += bufusage->local_blks_written;
entry->pgss.counters.temp_blks_read += bufusage->temp_blks_read;
entry->pgss.counters.temp_blks_written += bufusage->temp_blks_written;
entry->pgss.counters.blk_read_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_read_time);
entry->pgss.counters.blk_write_time += INSTR_TIME_GET_MILLISEC(bufusage->blk_write_time);
SpinLockRelease(&entry->mutex);
LWLockRelease(bundles_in_progress_lock);
}

static double
CalculateStandardDeviation(int64 calls, double sum_var_time)
{
return (calls > 1) ? (sqrt(sum_var_time / calls)) : 0.0;
}


/*
* PgssToString
* Converts the pg_stat_statements data to a CSV string, and stores it in pgss_str.
Expand All @@ -924,21 +966,63 @@ PgssToString(int64 query_id, char *pgss_str, YbQueryDiagnosticsPgss pgss, const
query_str = "";

snprintf(pgss_str, YB_QD_MAX_PGSS_LEN,
"queryid,query,calls,total_time,min_time,max_time,mean_time,stddev_time,rows,"
"shared_blks_hit,shared_blks_read,shared_blks_dirtied,shared_blks_written,"
"local_blks_hit,local_blks_read,local_blks_dirtied,local_blks_written,"
"temp_blks_read,temp_blks_written,blk_read_time,blk_write_time\n"
"%ld,\"%s\",%ld,%lf,%lf,%lf,%lf,%lf,%ld,%ld,%ld,%ld,"
"%ld,%ld,%ld,%ld,%ld,%ld,%ld,%lf,%lf\n",
query_id, query_str, pgss.counters.calls,
pgss.counters.total_time, pgss.counters.min_time, pgss.counters.max_time,
pgss.counters.mean_time, sqrt(pgss.counters.sum_var_time / pgss.counters.calls),
pgss.counters.rows, pgss.counters.shared_blks_hit, pgss.counters.shared_blks_read,
pgss.counters.shared_blks_dirtied, pgss.counters.shared_blks_written,
pgss.counters.local_blks_hit, pgss.counters.local_blks_read,
pgss.counters.local_blks_dirtied, pgss.counters.local_blks_written,
pgss.counters.temp_blks_read, pgss.counters.temp_blks_written,
pgss.counters.blk_read_time, pgss.counters.blk_write_time);
"query_id,query,calls,total_plan_time,total_exec_time,"
"min_plan_time,min_exec_time,max_plan_time,max_exec_time,"
"mean_plan_time,mean_exec_time,stddev_plan_time,stddev_exec_time,"
"rows,shared_blks_hit,shared_blks_read,shared_blks_dirtied,shared_blks_written,"
"local_blks_hit,local_blks_read,local_blks_dirtied,local_blks_written,"
"temp_blks_read,temp_blks_written,blk_read_time,blk_write_time,"
"temp_blk_read_time,temp_blk_write_time,wal_records,wal_fpi,wal_bytes,"
"jit_functions,jit_generation_time,jit_inlining_count,jit_inlining_time,"
"jit_optimization_count,jit_optimization_time,jit_emission_count,jit_emission_time\n"
"%ld,\"%s\",%ld,%lf,%lf,%lf,%lf,"
"%lf,%lf,%lf,%lf,%lf,%lf,"
"%ld,%ld,%ld,%ld,%ld,"
"%ld,%ld,%ld,%ld,"
"%ld,%ld,%lf,%lf,"
"%lf,%lf,%ld,%ld,%ld,"
"%ld,%lf,%ld,%lf,"
"%ld,%lf,%ld,%lf\n",
query_id, query_str,
pgss.counters.calls[YB_QD_PGSS_EXEC],
pgss.counters.total_time[YB_QD_PGSS_PLAN],
pgss.counters.total_time[YB_QD_PGSS_EXEC],
pgss.counters.min_time[YB_QD_PGSS_PLAN],
pgss.counters.min_time[YB_QD_PGSS_EXEC],
pgss.counters.max_time[YB_QD_PGSS_PLAN],
pgss.counters.max_time[YB_QD_PGSS_EXEC],
pgss.counters.mean_time[YB_QD_PGSS_PLAN],
pgss.counters.mean_time[YB_QD_PGSS_EXEC],
CalculateStandardDeviation(pgss.counters.calls[YB_QD_PGSS_PLAN],
pgss.counters.sum_var_time[YB_QD_PGSS_PLAN]),
CalculateStandardDeviation(pgss.counters.calls[YB_QD_PGSS_EXEC],
pgss.counters.sum_var_time[YB_QD_PGSS_EXEC]),
pgss.counters.rows,
pgss.counters.shared_blks_hit,
pgss.counters.shared_blks_read,
pgss.counters.shared_blks_dirtied,
pgss.counters.shared_blks_written,
pgss.counters.local_blks_hit,
pgss.counters.local_blks_read,
pgss.counters.local_blks_dirtied,
pgss.counters.local_blks_written,
pgss.counters.temp_blks_read,
pgss.counters.temp_blks_written,
pgss.counters.blk_read_time,
pgss.counters.blk_write_time,
pgss.counters.temp_blk_read_time,
pgss.counters.temp_blk_write_time,
pgss.counters.wal_records,
pgss.counters.wal_fpi,
pgss.counters.wal_bytes,
pgss.counters.jit_functions,
pgss.counters.jit_generation_time,
pgss.counters.jit_inlining_count,
pgss.counters.jit_inlining_time,
pgss.counters.jit_optimization_count,
pgss.counters.jit_optimization_time,
pgss.counters.jit_emission_count,
pgss.counters.jit_emission_time);
}

static void
Expand Down Expand Up @@ -1015,7 +1099,7 @@ InsertNewBundleInfo(YbQueryDiagnosticsMetadata *metadata)
{
.counters =
{
0
{ 0 }
},
.query_offset = 0,
.query_len = 0,
Expand Down
Loading

0 comments on commit f3c40f8

Please sign in to comment.