Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Emit push job status metrics from controller #1176

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ build/
internal/venice-test-common/tmp/
internal/venice-test-common/dump.complete.hprof
internal/venice-test-common/src/jmh/generated
internal/venice-test-common/file:/
.jekyll-cache/
_site/
Gemfile.lock
Expand Down
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down Expand Up @@ -805,7 +805,9 @@ task verifyJdkVersion {
def currentJdkVersion = JavaVersion.current()
def isSupported = supportedJdkVersions.any {version -> currentJdkVersion.equals(version) }
if (!isSupported) {
throw new GradleException("Invalid JDK version: ${currentJdkVersion}. Supported versions: ${supportedJdkVersions.join(', ')}.")
throw new GradleException("Invalid JDK version: ${currentJdkVersion}.\n" + \
"Supported versions: ${supportedJdkVersions.join(', ')}.\n" + \
"Please set the JAVA_HOME environment variable to a supported version either locally or globally.")
}
println "JDK version ${currentJdkVersion} is valid."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.ROUTER_PRINCIPAL_NAME;
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOCKING_QUEUE_TYPE;
import static com.linkedin.venice.ConfigKeys.SERVER_CHANNEL_OPTION_WRITE_BUFFER_WATERMARK_HIGH_BYTES;
Expand Down Expand Up @@ -538,6 +540,8 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int nonCurrentVersionAAWCLeaderQuotaRecordsPerSecond;
private final int nonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond;
private final int channelOptionWriteBufferHighBytes;
private final boolean aaWCWorkloadParallelProcessingEnabled;
private final int aaWCWorkloadParallelProcessingThreadPoolSize;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -899,6 +903,10 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
if (channelOptionWriteBufferHighBytes <= 0) {
throw new VeniceException("Invalid channel option write buffer high bytes: " + channelOptionWriteBufferHighBytes);
}
aaWCWorkloadParallelProcessingEnabled =
serverProperties.getBoolean(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_ENABLED, false);
aaWCWorkloadParallelProcessingThreadPoolSize =
serverProperties.getInt(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE, 8);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1608,4 +1616,12 @@ public int getQuotaEnforcementIntervalInMs() {
public int getQuotaEnforcementCapacityMultiple() {
return quotaEnforcementCapacityMultiple;
}

public boolean isAAWCWorkloadParallelProcessingEnabled() {
return aaWCWorkloadParallelProcessingEnabled;
}

public int getAAWCWorkloadParallelProcessingThreadPoolSize() {
return aaWCWorkloadParallelProcessingThreadPoolSize;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;


/**
* This class is used to process the AA/WC messages in parallel to address the long-tail partition lagging issues.
* For the AA/WC message handling, the consumption is not the bottleneck, but the processing overhead, and
* even with a single consumer, with {@link IngestionBatchProcessor}, we hope we can utilize the full node's
* resources to speed up the leader ingestion.
*/
public class IngestionBatchProcessor {
interface ProcessingFunction {
PubSubMessageProcessedResult apply(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
PartitionConsumptionState partitionConsumptionState,
int partition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs);
}

private final String storeVersionName;
private final ExecutorService batchProcessingThreadPool;
private final KeyLevelLocksManager lockManager;
private final boolean isWriteComputationEnabled;
private final boolean isActiveActiveReplicationEnabled;
private final ProcessingFunction processingFunction;

public IngestionBatchProcessor(
String storeVersionName,
ExecutorService batchProcessingThreadPool,
KeyLevelLocksManager lockManager,
ProcessingFunction processingFunction,
boolean isWriteComputationEnabled,
boolean isActiveActiveReplicationEnabled) {
this.storeVersionName = storeVersionName;
this.batchProcessingThreadPool = batchProcessingThreadPool;
this.lockManager = lockManager;
this.processingFunction = processingFunction;
this.isWriteComputationEnabled = isWriteComputationEnabled;
this.isActiveActiveReplicationEnabled = isActiveActiveReplicationEnabled;
}

/**
* When {@link #lockManager} is not null, this function will try to lock all the keys
* (except Control Messages) passed by the params.
*/
public List<ReentrantLock> lockKeys(List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records) {
if (lockManager != null) {
List<ReentrantLock> locks = new ArrayList<>(records.size());
records.forEach(r -> {
if (!r.getKey().isControlMessage()) {
ReentrantLock lock = lockManager.acquireLockByKey(ByteArrayKey.wrap(r.getKey().getKey()));
locks.add(lock);
lock.lock();
}
});
return locks;
}
return Collections.emptyList();
}

public void unlockKeys(List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, List<ReentrantLock> locks) {
if (lockManager != null) {
locks.forEach(lock -> lock.unlock());
records.forEach(r -> {
if (!r.getKey().isControlMessage()) {
lockManager.releaseLock(ByteArrayKey.wrap(r.getKey().getKey()));
}
});
}
}

public static boolean isAllMessagesFromRTTopic(
Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records) {
for (PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record: records) {
if (!record.getTopicPartition().getPubSubTopic().isRealTime()) {
return false;
}
}
return true;
}

public List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> process(
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records,
PartitionConsumptionState partitionConsumptionState,
int partition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs,
long beforeProcessingBatchRecordsTimestampMs) {
if (records.isEmpty()) {
return Collections.emptyList();
}
AtomicBoolean isAllMessagesFromRTTopic = new AtomicBoolean(true);
List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> resultList =
new ArrayList<>(records.size());
records.forEach(r -> {
resultList.add(new PubSubMessageProcessedResultWrapper<>(r));
if (!r.getTopicPartition().getPubSubTopic().isRealTime()) {
isAllMessagesFromRTTopic.set(false);
}
});
if (!isWriteComputationEnabled && !isActiveActiveReplicationEnabled) {
return resultList;
}
// Only handle records from the real-time topic
if (!isAllMessagesFromRTTopic.get()) {
return resultList;
}
/**
* We would like to process the messages belonging to the same key sequentially to avoid race conditions.
*/
Map<ByteArrayKey, List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>>> keyGroupMap =
new HashMap<>(records.size());
resultList.forEach(r -> {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> message = r.getMessage();
if (!message.getKey().isControlMessage()) {
ByteArrayKey byteArrayKey = ByteArrayKey.wrap(message.getKey().getKey());
keyGroupMap.computeIfAbsent(byteArrayKey, (ignored) -> new ArrayList<>()).add(r);
}
});
List<CompletableFuture<Void>> futureList = new ArrayList<>(keyGroupMap.size());
keyGroupMap.forEach((ignored, recordsWithTheSameKey) -> {
futureList.add(CompletableFuture.runAsync(() -> {
recordsWithTheSameKey.forEach(recordWithTheSameKey -> {
recordWithTheSameKey.setProcessedResult(
processingFunction.apply(
recordWithTheSameKey.getMessage(),
partitionConsumptionState,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs));
});
}, batchProcessingThreadPool));
});
try {
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).get();
} catch (Exception e) {
throw new VeniceException(
"Failed to execute the batch processing for " + storeVersionName + " partition: "
+ partitionConsumptionState.getPartition(),
e);
}

return resultList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private KafkaValueSerializer kafkaValueSerializer;
private final IngestionThrottler ingestionThrottler;
private final ExecutorService aaWCWorkLoadProcessingThreadPool;

public KafkaStoreIngestionService(
StorageEngineRepository storageEngineRepository,
Expand Down Expand Up @@ -438,6 +439,14 @@ public void handleStoreDeleted(Store store) {

VeniceViewWriterFactory viewWriterFactory = new VeniceViewWriterFactory(veniceConfigLoader);

if (serverConfig.isAAWCWorkloadParallelProcessingEnabled()) {
this.aaWCWorkLoadProcessingThreadPool = Executors.newFixedThreadPool(
serverConfig.getAAWCWorkloadParallelProcessingThreadPoolSize(),
new DaemonThreadFactory("AA_WC_PARALLEL_PROCESSING"));
} else {
this.aaWCWorkLoadProcessingThreadPool = null;
}

ingestionTaskFactory = StoreIngestionTaskFactory.builder()
.setVeniceWriterFactory(veniceWriterFactory)
.setStorageEngineRepository(storageEngineRepository)
Expand All @@ -463,6 +472,7 @@ public void handleStoreDeleted(Store store) {
.setRunnableForKillIngestionTasksForNonCurrentVersions(
serverConfig.getIngestionMemoryLimit() > 0 ? () -> killConsumptionTaskForNonCurrentVersions() : null)
.setHeartbeatMonitoringService(heartbeatMonitoringService)
.setAAWCWorkLoadProcessingThreadPool(aaWCWorkLoadProcessingThreadPool)
.build();
}

Expand Down Expand Up @@ -569,6 +579,8 @@ public void stopInner() {
leaderFollowerNotifiers.forEach(VeniceNotifier::close);
Utils.closeQuietlyWithErrorLogged(metaStoreWriter);

shutdownExecutorService(aaWCWorkLoadProcessingThreadPool, "aaWCWorkLoadProcessingThreadPool", true);

kafkaMessageEnvelopeSchemaReader.ifPresent(Utils::closeQuietlyWithErrorLogged);

// close it the very end to make sure all ingestion task have released the shared producers.
Expand Down
Loading
Loading