Skip to content

Commit

Permalink
[PLAT-9862]Observability: Audit log support for k8s
Browse files Browse the repository at this point in the history
Summary:
This PR introduces support for managing audit log configurations in a Kubernetes-based universe. Enabling audit logs will perform the following actions:

  - YBA will validate the presence of the OpenTelemetry Operator before enabling audit logs.
  - The ysql_pg_conf_csv parameter will be updated based on the audit log settings specified in taskParams.
  - YBA will configure the OpenTelemetry Collector via Helm values
  - The universe.userIntent will be updated to reflect the new audit log configuration.

Exporter settings (entire configuration)

```
otelCollector:
  enabled: true
  exporters:
    datadog:
      api:
        key: <KEY>
        site: <SITE>
      retry_on_failure:
        enabled: true
        initial_interval: 1m
        max_elapsed_time: 1800m
        max_interval: 1800m
      sending_queue:
        enabled: true
        storage: file_storage/queue
  recievers:
    ysql:
      lineStartPattern: ([A-Z]\d{4})|((?P<timestamp_with_ms>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[.]\d{3}
        \w{3})[ ][[](?P<process_id>\d+)[]][ ])
      regex: '(?P<timestamp_with_ms>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[.]\d{3} \w{3})[
        ][[](?P<process_id>\d+)[]][ ](?P<log_level>\w+):  AUDIT: (?P<audit_type>\w+),(?P<statement_id>\d+),(?P<substatement_id>\d+),(?P<class>\w+),(?P<command>[^,]+),(?P<object_type>[^,]*),(?P<object_name>[^,]*),(?P<statement>(.|\n|\r|\s)*)'
      timestamp:
        layout: '%Y-%m-%d %H:%M:%S.%L %Z'
        parse_from: attributes.timestamp_with_ms
  secretEnv: []

```

Test Plan:
  - Create a k8s universe -> enable audit config -> Edit universe -> Disable audit config
  - Enable multiple exporters via API
  - Enable only audit logs (export disabled)
  - Enable audit logs on a universe with RR
  - Enable exporter on a cluster without opentelemetry operator installed -> Precheck fails
  - Skip precheck by setting skip_opentelemetry_operator_check to true -> retry task -> task fails at future step

Reviewers: anijhawan, vkumar, skurapati, amalyshev, vbansal

Reviewed By: vkumar, amalyshev

Subscribers: svc_phabricator, yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D41338
  • Loading branch information
Arpit Nabaria authored and Arpit-yb committed Feb 21, 2025
1 parent 8a23df9 commit 5d56ee3
Show file tree
Hide file tree
Showing 23 changed files with 654 additions and 160 deletions.
101 changes: 52 additions & 49 deletions bin/log_cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,17 @@ logs_purge_threshold_kb=$(( logs_purge_threshold_kb / 2 ))
find_and_sort() {
dir=$1
regex=$2
find "${dir}" -type f -name "${regex}" -print0 | \
exclude="${3:-"$^"}" # exclude nothing by default
find "${dir}" -type f -name "${regex}" -not -name "${exclude}" -print0 | \
xargs -0 -r stat -c '%Y %n' | \
sort | cut -d' ' -f2-
}

delete_log_files() {
delete_or_gzip_log_files() {
local log_dir=$1
local find_regex=$2
local permitted_usage=$3
local permitted_usage_kb=$3
local command=$4
local logs_disk_usage_bytes
logs_disk_usage_bytes=$(find "${log_dir}" -type f -name "${find_regex}" -print0 | \
xargs -0 -r stat -c '%s' | \
Expand All @@ -115,49 +117,52 @@ delete_log_files() {
logs_disk_usage_bytes=0
fi
local logs_disk_usage_kb=$(( logs_disk_usage_bytes / 1000 ))
echo "Permitted disk usage for $find_regex files in kb: ${permitted_usage}"
echo "Disk usage by $find_regex files in kb: ${logs_disk_usage_kb}"

# get all the gz files.
local gz_files
local file_size
gz_files=$(find_and_sort "${log_dir}" "${find_regex}.gz")
for file in ${gz_files}; do
# If usage exceeds permitted, delete the old gz files.
if [[ "${logs_disk_usage_kb}" -gt "${permitted_usage}" ]]; then
file_size=$(du -k "${file}" | awk '{print $1}')
logs_disk_usage_kb=$(( logs_disk_usage_kb - file_size ))
echo "Delete file ${file}"
rm "${file}"
else
break
# Skip this part if command is gzip
# Remove all zipped files till we are under permitted usage
if [[ "$command" == "delete" ]]; then
echo "Permitted disk usage for $find_regex files in kb: ${permitted_usage_kb}"
echo "Disk usage by $find_regex files in kb: ${logs_disk_usage_kb}"
# get all the gz files.
local gz_files
local file_size
gz_files=$(find_and_sort "${log_dir}" "${find_regex}.gz")
for file in ${gz_files}; do
# If usage exceeds permitted, delete the old gz files.
if [[ "${logs_disk_usage_kb}" -gt "${permitted_usage_kb}" ]]; then
file_size=$(du -k "${file}" | awk '{print $1}')
logs_disk_usage_kb=$(( logs_disk_usage_kb - file_size ))
echo "Delete file ${file}"
rm "${file}"
else
break
fi
done
# Skip deletion of non-gz files if we are under permitted usage
if [[ "${logs_disk_usage_kb}" -le "${permitted_usage_kb}" ]]; then
return
fi
done

# Skip deletion of non-gz files if we are under permitted usage
if [[ "${logs_disk_usage_kb}" -le "${permitted_usage}" ]]; then
return
fi

# All the non-gz files
# Delete or gzip all the non-gz files till we are under permitted usage
local files
local current_file
files="$(find_and_sort "${log_dir}" "${find_regex}")"
# Remove the current log files from the list
for log_regex in ${log_regexes}; do
current_file="$(find_and_sort "${log_dir}" "${log_regex}" | tail -n1)"
# double quotes around files are import
# https://stackoverflow.com/a/4651495
files="$(echo "${files}" | grep -v -E "^${current_file}$")"
done
local file_size
# gzip/delete all files except the most recent one.
files=$(find_and_sort "${log_dir}" "${find_regex}" "*.gz" | head -n -1)
for file in ${files}; do
# If usage exceeds permitted, delete the old files.
if [[ "${logs_disk_usage_kb}" -gt "${permitted_usage}" ]]; then
if [[ "${logs_disk_usage_kb}" -gt "${permitted_usage_kb}" ]]; then
file_size=$(du -k "$file" | awk '{print $1}')
logs_disk_usage_kb=$(( logs_disk_usage_kb - file_size ))
echo "Delete file ${file}"
rm "${file}"
if [ "$command" == "delete" ]; then
echo "Delete file $file"
rm "$file"
else
echo "Compressing file $file"
gzip "$file" || echo "Compression failed. Continuing."
local new_file_size_kb
new_file_size_kb=$(du -k "${file}".gz | awk '{print $1}')
logs_disk_usage_kb=$((logs_disk_usage_kb + new_file_size_kb))
fi
else
break
fi
Expand Down Expand Up @@ -220,19 +225,17 @@ for daemon_type in ${daemon_types}; do
for log_regex in ${log_regexes}; do
# Using print0 since printf is not supported on all UNIX systems.
# xargs -0 -r stat -c '%Y %n' outputs: [unix time in millisecs] [name of file]
non_gz_files=$(find "${YB_LOG_DIR}" -type f -name "${log_regex}" ! -name "*.gz" -print0 | \
xargs -0 -r stat -c '%Y %n' | \
sort | cut -d' ' -f2-
)
non_gz_files=$(find_and_sort "${YB_LOG_DIR}" "${log_regex}" "*.gz")
# TODO: grep -c can be used here instead of wc -l.
non_gz_file_count=$(echo "${non_gz_files}" | wc -l)
# gzip all files but the current one.
if [[ "${non_gz_file_count}" -gt 1 ]]; then
files_to_gzip=$(echo "${non_gz_files}" | head -n-1)
for file in ${files_to_gzip}; do
echo "Compressing file ${file}"
gzip "${file}" || echo "Compression failed. Continuing."
done
permitted_postgres_plain_disk_usage_kb=0
if [[ "${log_regex}" == "postgres*log" && "${PRESERVE_AUDIT_LOGS:-}" == "true" ]]; then
permitted_postgres_plain_disk_usage_kb=$((postgres_max_log_size_kb / 2))
fi
delete_or_gzip_log_files "${YB_LOG_DIR}" "${log_regex}" \
"${permitted_postgres_plain_disk_usage_kb}" "gzip"
fi
done

Expand All @@ -245,8 +248,8 @@ for daemon_type in ${daemon_types}; do
percent_disk_usage_kb=$(( disk_size_kb * logs_disk_percent_max / 100 ))
permitted_disk_usage_kb=$([[ "${percent_disk_usage_kb}" -le "${logs_purge_threshold_kb}" ]] && \
echo "${percent_disk_usage_kb}" || echo "${logs_purge_threshold_kb}")
delete_log_files "${YB_LOG_DIR}" "${server_log}" "${permitted_disk_usage_kb}"
delete_log_files "${YB_LOG_DIR}" "${postgres_log}" "${postgres_max_log_size_kb}"
delete_or_gzip_log_files "${YB_LOG_DIR}" "${server_log}" "${permitted_disk_usage_kb}" "delete"
delete_or_gzip_log_files "${YB_LOG_DIR}" "${postgres_log}" \
"${postgres_max_log_size_kb}" "delete"
fi
done

1 change: 1 addition & 0 deletions managed/RUNTIME-FLAGS.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,4 @@
| "Use S3 IAM roles attached to DB node for Backup/Restore" | "yb.backup.s3.use_db_nodes_iam_role_for_backup" | "UNIVERSE" | "Use S3 IAM roles attached to DB node for Backup/Restore" | "Boolean" |
| "Queue Wait Time for Tasks" | "yb.task.queue_wait_time" | "UNIVERSE" | "Wait time for a queued task before the running task can be evicted forcefully." | "Duration" |
| "Common Name Required for Certificates" | "yb.tls.cert_manager.common_name_required" | "UNIVERSE" | "If true, YBA will add commonName to the CertificateRequest sent to cert manager." | "Boolean" |
| "Skip OpenTelemetry Operator Check" | "yb.universe.skip_otel_operator_check" | "UNIVERSE" | "If true, YBA will skip checking for Opentelemetry operator installation on the cluster." | "Boolean" |
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.yugabyte.yw.commissioner.tasks.subtasks.PreflightNodeCheck;
import com.yugabyte.yw.commissioner.tasks.subtasks.SetupYNP;
import com.yugabyte.yw.commissioner.tasks.subtasks.UniverseSetTlsParams;
import com.yugabyte.yw.commissioner.tasks.subtasks.UpdateAndPersistAuditLoggingConfig;
import com.yugabyte.yw.commissioner.tasks.subtasks.UpdateClusterAPIDetails;
import com.yugabyte.yw.commissioner.tasks.subtasks.UpdateNodeDetails;
import com.yugabyte.yw.commissioner.tasks.subtasks.UpdateUniverseCommunicationPorts;
Expand Down Expand Up @@ -3887,4 +3888,13 @@ protected void createSetYBMajorVersionUpgradeCompatibility(
})
.setSubTaskGroupType(SubTaskGroupType.UpdatingGFlags);
}

public void updateAndPersistAuditLoggingConfigTask() {
TaskExecutor.SubTaskGroup subTaskGroup =
createSubTaskGroup("UpdateAndPersistAuditLoggingConfig");
UpdateAndPersistAuditLoggingConfig task = createTask(UpdateAndPersistAuditLoggingConfig.class);
task.initialize(taskParams());
subTaskGroup.addSubTask(task);
getRunnableTask().addSubTaskGroup(subTaskGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.yugabyte.yw.common.PlatformServiceException;
import com.yugabyte.yw.common.ReleaseManager;
import com.yugabyte.yw.common.Util;
import com.yugabyte.yw.common.audit.otel.OtelCollectorConfigGenerator;
import com.yugabyte.yw.common.backuprestore.ybc.YbcManager;
import com.yugabyte.yw.common.certmgmt.CertConfigType;
import com.yugabyte.yw.common.certmgmt.CertificateDetails;
Expand All @@ -40,6 +41,7 @@
import com.yugabyte.yw.common.config.UniverseConfKeys;
import com.yugabyte.yw.common.gflags.GFlagsUtil;
import com.yugabyte.yw.common.helm.HelmUtils;
import com.yugabyte.yw.common.yaml.SkipNullRepresenter;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.ExposingServiceState;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.UserIntent;
Expand All @@ -57,6 +59,7 @@
import com.yugabyte.yw.models.helpers.PlacementInfo;
import com.yugabyte.yw.models.helpers.UpgradeDetails;
import com.yugabyte.yw.models.helpers.UpgradeDetails.YsqlMajorVersionUpgradeState;
import com.yugabyte.yw.models.helpers.audit.AuditLogConfig;
import com.yugabyte.yw.models.helpers.provider.region.WellKnownIssuerKind;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
Expand Down Expand Up @@ -184,19 +187,22 @@ public String toString() {
private final ReleaseManager releaseManager;
private final FileHelperService fileHelperService;
private final YbcManager ybcManager;
private final OtelCollectorConfigGenerator otelCollectorConfigGenerator;

@Inject
protected KubernetesCommandExecutor(
BaseTaskDependencies baseTaskDependencies,
KubernetesManagerFactory kubernetesManagerFactory,
ReleaseManager releaseManager,
FileHelperService fileHelperService,
YbcManager ybcManager) {
YbcManager ybcManager,
OtelCollectorConfigGenerator otelCollectorConfigGenerator) {
super(baseTaskDependencies);
this.kubernetesManagerFactory = kubernetesManagerFactory;
this.releaseManager = releaseManager;
this.fileHelperService = fileHelperService;
this.ybcManager = ybcManager;
this.otelCollectorConfigGenerator = otelCollectorConfigGenerator;
}

static final Pattern nodeNamePattern = Pattern.compile(".*-n(\\d+)+");
Expand Down Expand Up @@ -255,6 +261,8 @@ public static class Params extends UniverseTaskParams {
public boolean usePreviousGflagsChecksum = false;
public boolean createNamespacedService = false;
public Set<String> deleteServiceNames;
// Opentelemetry collector related params
public AuditLogConfig auditLogConfig = null;
// Only set false for create universe case initially
public boolean masterJoinExistingCluster = true;

Expand Down Expand Up @@ -756,7 +764,7 @@ private void populatePreviousGflagsChecksum() {

private String generateHelmOverride() {
Map<String, Object> overrides = new HashMap<String, Object>();
Yaml yaml = new Yaml();
Yaml yaml = new Yaml(new SkipNullRepresenter());

// TODO: decide if the user wants to expose all the services or just master.
overrides = yaml.load(environment.resourceAsStream("k8s-expose-all.yml"));
Expand Down Expand Up @@ -1347,6 +1355,22 @@ private String generateHelmOverride() {
tserverGFlags, TIMESTAMP_HISTORY_RETENTION_GFLAG_MAP);
}

// Add overrides for OpenTelemetry
if (primaryClusterIntent.auditLogConfig != null) {
AuditLogConfig auditLogConfig = primaryClusterIntent.auditLogConfig;
tserverGFlags.put(
GFlagsUtil.YSQL_PG_CONF_CSV,
GFlagsUtil.mergeCSVs(
tserverGFlags.getOrDefault(GFlagsUtil.YSQL_PG_CONF_CSV, ""),
GFlagsUtil.getYsqlPgConfCsv(auditLogConfig, null),
true));
overrides.put(
"otelCollector",
otelCollectorConfigGenerator.getOtelHelmValues(
auditLogConfig,
GFlagsUtil.getLogLinePrefix(tserverGFlags.get(GFlagsUtil.YSQL_PG_CONF_CSV))));
}

if (!tserverGFlags.isEmpty()) {
gflagOverrides.put("tserver", tserverGFlags);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2025 YugaByte, Inc. and Contributors
*
* Licensed under the Polyform Free Trial License 1.0.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://github.com/YugaByte/yugabyte-db/blob/master/licenses/POLYFORM-FREE-TRIAL-LICENSE-1.0.0.txt
*/

package com.yugabyte.yw.commissioner.tasks.subtasks.check;

import com.google.inject.Inject;
import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.tasks.KubernetesTaskBase;
import com.yugabyte.yw.common.ShellKubernetesManager;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CheckOpentelemetryOperator extends KubernetesTaskBase {

private final ShellKubernetesManager shellKubernetesManager;

@Inject
protected CheckOpentelemetryOperator(
BaseTaskDependencies baseTaskDependencies, ShellKubernetesManager shellKubernetesManager) {
super(baseTaskDependencies);
this.shellKubernetesManager = shellKubernetesManager;
}

@Override
public UniverseDefinitionTaskParams taskParams() {
return (UniverseDefinitionTaskParams) taskParams;
}

@Override
public void run() {
try {
shellKubernetesManager.checkOpentelemetryOperatorRunning();
} catch (Exception e) {
log.error("Error executing task {} with error={}.", getName(), e.getMessage());
throw e;
}
log.info("Opentelemetry collector is installed.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 YugaByte, Inc. and Contributors
*
* Licensed under the Polyform Free Trial License 1.0.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://github.com/YugaByte/yugabyte-db/blob/master/licenses/POLYFORM-FREE-TRIAL-LICENSE-1.0.0.txt
*/
package com.yugabyte.yw.commissioner.tasks.upgrade;

import com.yugabyte.yw.commissioner.BaseTaskDependencies;
import com.yugabyte.yw.commissioner.KubernetesUpgradeTaskBase;
import com.yugabyte.yw.commissioner.TaskExecutor.SubTaskGroup;
import com.yugabyte.yw.commissioner.UserTaskDetails.SubTaskGroupType;
import com.yugabyte.yw.commissioner.tasks.subtasks.check.CheckOpentelemetryOperator;
import com.yugabyte.yw.common.config.UniverseConfKeys;
import com.yugabyte.yw.common.operator.OperatorStatusUpdaterFactory;
import com.yugabyte.yw.forms.AuditLogConfigParams;
import com.yugabyte.yw.forms.UniverseDefinitionTaskParams.Cluster;
import com.yugabyte.yw.models.Universe;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ModifyKubernetesAuditLoggingConfig extends KubernetesUpgradeTaskBase {

@Inject
protected ModifyKubernetesAuditLoggingConfig(
BaseTaskDependencies baseTaskDependencies,
OperatorStatusUpdaterFactory operatorStatusUpdaterFactory) {
super(baseTaskDependencies, operatorStatusUpdaterFactory);
}

@Override
protected AuditLogConfigParams taskParams() {
return (AuditLogConfigParams) taskParams;
}

@Override
public SubTaskGroupType getTaskSubGroupType() {
return SubTaskGroupType.Provisioning;
}

@Override
protected void createPrecheckTasks(Universe universe) {
super.createPrecheckTasks(universe);
addBasicPrecheckTasks();
if (!confGetter.getConfForScope(universe, UniverseConfKeys.skipOpentelemetryOperatorCheck)) {
checkOtelOperatorInstallation(universe);
}
}

@Override
public void run() {
runUpgrade(
() -> {
Universe universe = getUniverse();
Cluster cluster = universe.getUniverseDetails().getPrimaryCluster();
cluster.userIntent.auditLogConfig = taskParams().auditLogConfig;

// Create Kubernetes Upgrade Task.
createUpgradeTask(
universe,
cluster.userIntent.ybSoftwareVersion,
/* upgradeMasters */ true,
/* upgradeTservers */ true,
universe.isYbcEnabled(),
universe.getUniverseDetails().getYbcSoftwareVersion());
updateAndPersistAuditLoggingConfigTask();
});
}

private void checkOtelOperatorInstallation(Universe universe) {
if (confGetter.getConfForScope(universe, UniverseConfKeys.skipOpentelemetryOperatorCheck)) {
log.info("Skipping Opentelemetry Operator check.");
return;
}
SubTaskGroup subTaskGroup =
createSubTaskGroup("CheckOpentelemetryOperator", SubTaskGroupType.PreflightChecks);
CheckOpentelemetryOperator task = createTask(CheckOpentelemetryOperator.class);
task.initialize(universe.getUniverseDetails());
subTaskGroup.addSubTask(task);
getRunnableTask().addSubTaskGroup(subTaskGroup);
}
}
Loading

0 comments on commit 5d56ee3

Please sign in to comment.