Skip to content

Commit

Permalink
Upgrade Helix to 1.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Dec 13, 2024
1 parent 90b437f commit b063ca1
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
Expand Down Expand Up @@ -154,6 +152,7 @@
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
Expand Down Expand Up @@ -195,10 +194,8 @@ public class PinotHelixResourceManager {
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f);
private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5;
public static final String APPEND = "APPEND";
private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500;
private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500;
private static final String API_REQUEST_ID_PREFIX = "api-";
private static final int INFINITE_TIMEOUT = -1;

private enum LineageUpdateType {
START, END, REVERT
Expand All @@ -207,8 +204,6 @@ private enum LineageUpdateType {
// TODO: make this configurable
public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes
public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second
public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 minutes
public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 second

private static final DateTimeFormatter SIMPLE_DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC);
Expand Down Expand Up @@ -2570,17 +2565,9 @@ private void deleteTableOnServers(String tableNameWithType) {
}

LOGGER.info("Sending delete table messages for table: {}", tableNameWithType);
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

// Infinite timeout on the recipient
int timeoutMs = -1;
int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs);
TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType);
int numMessagesSent = MessagingServiceUtils.send(messagingService, tableDeletionMessage, tableNameWithType);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType);
} else {
Expand Down Expand Up @@ -2625,20 +2612,15 @@ public Map<String, Pair<Integer, String>> reloadSegments(String tableNameWithTyp
Preconditions.checkArgument(tt == TableType.OFFLINE,
"Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType);
}
// Infinite timeout on the recipient
int timeoutMs = -1;

ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry : instanceToSegmentsMap.entrySet()) {
String targetInstance = entry.getKey();
List<String> segments = entry.getValue();
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName(targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segments, forceDownload);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs);
SegmentReloadMessage segmentReloadMessage =
new SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages to instance: {} for table: {}", numMessagesSent, targetInstance,
tableNameWithType);
Expand All @@ -2662,17 +2644,10 @@ public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean
"Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType);
}

Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

// Infinite timeout on the recipient
int timeoutMs = -1;
int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, tableNameWithType);
} else {
Expand All @@ -2695,19 +2670,12 @@ public Pair<Integer, String> reloadSegment(String tableNameWithType, String segm
segmentName);
}

Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setPartition(segmentName);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage =
new SegmentReloadMessage(tableNameWithType, Collections.singletonList(segmentName), forceDownload);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

// Infinite timeout on the recipient
int timeoutMs = -1;
int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs);
SegmentReloadMessage segmentReloadMessage =
new SegmentReloadMessage(tableNameWithType, List.of(segmentName), forceDownload);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, segmentName,
targetInstance);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} reload messages for segment: {} in table: {}", numMessagesSent, segmentName,
tableNameWithType);
Expand Down Expand Up @@ -2890,21 +2858,13 @@ private void resetPartitionAllState(String instanceName, String resourceName, Se
*/
public void sendSegmentRefreshMessage(String tableNameWithType, String segmentName, boolean refreshServerSegment,
boolean refreshBrokerRouting) {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(tableNameWithType, segmentName);

// Send segment refresh message to servers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setSessionSpecific(true);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

if (refreshServerSegment) {
// Send segment refresh message to servers
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setPartition(segmentName);
// Send message with no callback and infinite timeout on the recipient
int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentRefreshMessage, tableNameWithType, segmentName, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", numMessagesSent,
Expand All @@ -2915,11 +2875,11 @@ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentNa
}
}

// Send segment refresh message to brokers
if (refreshBrokerRouting) {
// Send segment refresh message to brokers
recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setPartition(tableNameWithType);
int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE,
tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", numMessagesSent,
Expand All @@ -2932,18 +2892,11 @@ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentNa
}

private void sendTableConfigRefreshMessage(String tableNameWithType) {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType);

// Send table config refresh message to brokers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setSessionSpecific(true);
recipientCriteria.setPartition(tableNameWithType);
// Send message with no callback and infinite timeout on the recipient
int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, tableConfigRefreshMessage, null, -1);
MessagingServiceUtils.send(messagingService, tableConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE,
tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", numMessagesSent, tableNameWithType);
Expand All @@ -2953,36 +2906,23 @@ private void sendTableConfigRefreshMessage(String tableNameWithType) {
}

private void sendApplicationQpsQuotaRefreshMessage(String appName) {
ApplicationQpsQuotaRefreshMessage message = new ApplicationQpsQuotaRefreshMessage(appName);

// Send database config refresh message to brokers
Criteria criteria = new Criteria();
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setInstanceName("%");
criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
criteria.setSessionSpecific(true);

int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, INFINITE_TIMEOUT);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
ApplicationQpsQuotaRefreshMessage quotaRefreshMessage = new ApplicationQpsQuotaRefreshMessage(appName);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, quotaRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} applcation qps quota refresh messages to brokers for application: {}", numMessagesSent,
LOGGER.info("Sent {} application qps quota refresh messages to brokers for application: {}", numMessagesSent,
appName);
} else {
LOGGER.warn("No application qps quota refresh message sent to brokers for application: {}", appName);
}
}

private void sendDatabaseConfigRefreshMessage(String databaseName) {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new DatabaseConfigRefreshMessage(databaseName);

// Send database config refresh message to brokers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setSessionSpecific(true);
// Send message with no callback and infinite timeout on the recipient
int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, databaseConfigRefreshMessage, null, -1);
MessagingServiceUtils.send(messagingService, databaseConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} database config refresh messages to brokers for database: {}", numMessagesSent,
databaseName);
Expand All @@ -2992,18 +2932,11 @@ private void sendDatabaseConfigRefreshMessage(String databaseName) {
}

private void sendRoutingTableRebuildMessage(String tableNameWithType) {
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType);

// Send table config refresh message to brokers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setSessionSpecific(true);
recipientCriteria.setPartition(tableNameWithType);
// Send message with no callback and infinite timeout on the recipient
int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1);
MessagingServiceUtils.send(messagingService, routingTableRebuildMessage, Helix.BROKER_RESOURCE_INSTANCE,
tableNameWithType, null);
if (numMessagesSent > 0) {
// TODO: Would be nice if we can get the name of the instances to which messages were sent
LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent,
Expand Down Expand Up @@ -4506,27 +4439,17 @@ public int getNumReplicas(TableConfig tableConfig) {
public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName,
Map<String, String> taskProperties) {
String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8);

LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {},"
+ " with properties {}.\"", periodicTaskRequestId, periodicTaskName,
tableName != null ? " table '" + tableName + "'" : "all tables", taskProperties);

// Create and send message to send to all controllers (including this one)
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setSessionSpecific(true);
recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
recipientCriteria.setSelfExcluded(false);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
RunPeriodicTaskMessage runPeriodicTaskMessage =
new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName, taskProperties);

ClusterMessagingService clusterMessagingService = getHelixZkManager().getMessagingService();
int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1);

int numMessagesSent = MessagingServiceUtils.sendIncludingSelf(messagingService, runPeriodicTaskMessage,
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId,
messageCount);
return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0);
numMessagesSent);
return new PeriodicTaskInvocationResponse(periodicTaskRequestId, numMessagesSent > 0);
}

/**
Expand Down
Loading

0 comments on commit b063ca1

Please sign in to comment.