Skip to content

Commit

Permalink
Fix style check
Browse files Browse the repository at this point in the history
  • Loading branch information
lnbest0707-uber committed Dec 19, 2024
1 parent f23ae1a commit a4235ff
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,54 +40,48 @@ public class PredownloadScheduler {
private static final String TMP_DIR_NAME = "tmp";
// Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID()
private static final String TMP_DIR_FORMAT = "tmp-%s-%s";
private static final long DOWNLOAD_SEGMENT_TIMEOUT_SEC = 60;
private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60;
private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5;
private final PropertiesConfiguration _properties;
private final PinotConfiguration _pinotConfig;
private final InstanceDataManagerConfig _instanceDataManagerConfig;
private final String _clusterName;
private final String _instanceId;
private final String _zkAddress;
@VisibleForTesting
Executor executor;
Executor _executor;
@VisibleForTesting
Set<String> failedSegments;
private final PropertiesConfiguration properties;
private final PinotConfiguration pinotConfig;
private final InstanceDataManagerConfig instanceDataManagerConfig;

Set<String> _failedSegments;
@SuppressWarnings("NullAway.Init")
private PredownloadMetrics predownloadMetrics;

private final String clusterName;
private final String instanceId;
private final String zkAddress;
private int numOfSkippedSegments;
private int numOfUnableToDownloadSegments;
private int numOfDownloadSegments;
private long totalDownloadedSizeBytes;

private PredownloadMetrics _predownloadMetrics;
private int _numOfSkippedSegments;
private int _numOfUnableToDownloadSegments;
private int _numOfDownloadSegments;
private long _totalDownloadedSizeBytes;
@SuppressWarnings("NullAway.Init")
private ZKClient zkClient;

private ZKClient _zkClient;
@SuppressWarnings("NullAway.Init")
private List<SegmentInfo> segmentInfoList;

private List<SegmentInfo> _segmentInfoList;
@SuppressWarnings("NullAway.Init")
private Map<String, TableInfo> tableInfoMap;
private Map<String, TableInfo> _tableInfoMap;

public PredownloadScheduler(PropertiesConfiguration properties)
throws Exception {
this.properties = properties;
this.clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
this.zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
this.instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
this.pinotConfig = new PinotConfiguration(properties);
this.instanceDataManagerConfig =
new HelixInstanceDataManagerConfig(new ServerConf(pinotConfig).getInstanceDataManagerConfig());
_properties = properties;
_clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
_zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
_instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID);
_pinotConfig = new PinotConfiguration(properties);
_instanceDataManagerConfig =
new HelixInstanceDataManagerConfig(new ServerConf(_pinotConfig).getInstanceDataManagerConfig());
// Get the number of available processors (vCPUs)
int numProcessors = Runtime.getRuntime().availableProcessors();
failedSegments = ConcurrentHashMap.newKeySet();
_failedSegments = ConcurrentHashMap.newKeySet();
// TODO: tune the value
executor = Executors.newFixedThreadPool(numProcessors * 3);
_executor = Executors.newFixedThreadPool(numProcessors * 3);
LOGGER.info("Created thread pool with num of threads: {}", numProcessors * 3);
this.numOfSkippedSegments = 0;
this.numOfDownloadSegments = 0;
_numOfSkippedSegments = 0;
_numOfDownloadSegments = 0;
}

public void start() {
Expand All @@ -97,7 +91,7 @@ public void start() {
public void run() {
try {
LOGGER.info("Trying to stop predownload process!");
this.stop();
stop();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("error shutting down predownload process : ", e);
Expand All @@ -114,47 +108,49 @@ public void run() {
PredownloadCompleteReason reason = downloadSegments();
long timeTaken = System.currentTimeMillis() - startTime;
LOGGER.info(
"Predownload process took {} sec, tried to download {} segments, skipped {} segments and unable to download {} segments. Download size: {} MB. Download speed: {} MB/s",
timeTaken / 1000, numOfDownloadSegments, numOfSkippedSegments, numOfUnableToDownloadSegments,
totalDownloadedSizeBytes / (1024 * 1024), (totalDownloadedSizeBytes / (1024 * 1024)) / (timeTaken / 1000 + 1));
"Predownload process took {} sec, tried to download {} segments, skipped {} segments "
+ "and unable to download {} segments. Download size: {} MB. Download speed: {} MB/s",
timeTaken / 1000, _numOfDownloadSegments, _numOfSkippedSegments, _numOfUnableToDownloadSegments,
_totalDownloadedSizeBytes / (1024 * 1024),
(_totalDownloadedSizeBytes / (1024 * 1024)) / (timeTaken / 1000 + 1));
if (reason.isSucceed()) {
predownloadMetrics.preDownloadSucceed(totalDownloadedSizeBytes, timeTaken);
_predownloadMetrics.preDownloadSucceed(_totalDownloadedSizeBytes, timeTaken);
}
StatusRecorder.predownloadComplete(reason, clusterName, instanceId, String.join(",", failedSegments));
StatusRecorder.predownloadComplete(reason, _clusterName, _instanceId, String.join(",", _failedSegments));
}

public void stop() {
if (zkClient != null) {
zkClient.close();
if (_zkClient != null) {
_zkClient.close();
}
if (executor != null) {
((ThreadPoolExecutor) executor).shutdownNow();
if (_executor != null) {
((ThreadPoolExecutor) _executor).shutdownNow();
}
}

void initializeZK() {
LOGGER.info("Initializing ZK client with address: {} and instanceId: {}", zkAddress, instanceId);
zkClient = new ZKClient(zkAddress, clusterName, instanceId);
zkClient.start();
LOGGER.info("Initializing ZK client with address: {} and instanceId: {}", _zkAddress, _instanceId);
_zkClient = new ZKClient(_zkAddress, _clusterName, _instanceId);
_zkClient.start();
}

void initializeMetricsReporter() {
LOGGER.info("Initializing metrics reporter");

predownloadMetrics = new PredownloadMetrics();
StatusRecorder.registerMetrics(predownloadMetrics);
_predownloadMetrics = new PredownloadMetrics();
StatusRecorder.registerMetrics(_predownloadMetrics);
}

@VisibleForTesting
void getSegmentsInfo() {
LOGGER.info("Getting segments info from ZK");
segmentInfoList = zkClient.getSegmentsOfInstance(zkClient.getDataAccessor());
if (segmentInfoList.isEmpty()) {
_segmentInfoList = _zkClient.getSegmentsOfInstance(_zkClient.getDataAccessor());
if (_segmentInfoList.isEmpty()) {
PredownloadCompleteReason reason = PredownloadCompleteReason.NO_SEGMENT_TO_PREDOWNLOAD;
StatusRecorder.predownloadComplete(reason, clusterName, instanceId, "");
StatusRecorder.predownloadComplete(reason, _clusterName, _instanceId, "");
}
tableInfoMap = new HashMap<>();
zkClient.updateSegmentMetadata(segmentInfoList, tableInfoMap, instanceDataManagerConfig);
_tableInfoMap = new HashMap<>();
_zkClient.updateSegmentMetadata(_segmentInfoList, _tableInfoMap, _instanceDataManagerConfig);
}

@VisibleForTesting
Expand All @@ -164,22 +160,22 @@ void loadSegmentsFromLocal() {
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Submit tasks to the executor
for (SegmentInfo segmentInfo : segmentInfoList) {
for (SegmentInfo segmentInfo : _segmentInfoList) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
boolean loadSegmentSuccess = false;
try {
TableInfo tableInfo = tableInfoMap.get(segmentInfo.getTableNameWithType());
TableInfo tableInfo = _tableInfoMap.get(segmentInfo.getTableNameWithType());
if (tableInfo != null) {
loadSegmentSuccess = tableInfo.loadSegmentFromLocal(segmentInfo, instanceDataManagerConfig);
loadSegmentSuccess = tableInfo.loadSegmentFromLocal(segmentInfo, _instanceDataManagerConfig);
}
} catch (Exception e) {
LOGGER.error("Failed to load from local for segment: {} of table: {} with issue ",
segmentInfo.getSegmentName(), segmentInfo.getTableNameWithType(), e);
}
if (!loadSegmentSuccess && segmentInfo.canBeDownloaded()) {
failedSegments.add(segmentInfo.getSegmentName());
_failedSegments.add(segmentInfo.getSegmentName());
}
}, executor);
}, _executor);

futures.add(future);
}
Expand Down Expand Up @@ -207,18 +203,18 @@ void initializeSegmentFetcher() {
LOGGER.info("Initializing segment fetchers");
// Initialize the components to download segments from deep store
PinotConfiguration segmentFetcherFactoryConfig =
pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
PinotConfiguration pinotFSConfig = pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
_pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
PinotConfiguration pinotFSConfig = _pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
PinotConfiguration pinotCrypterConfig =
pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
_pinotConfig.subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
try {
SegmentFetcherFactory.init(segmentFetcherFactoryConfig);
PinotFSFactory.init(pinotFSConfig);
PinotCrypterFactory.init(pinotCrypterConfig);
} catch (Exception e) {
LOGGER.error("Failed to initialize segment fetcher factory: {}", e);
StatusRecorder.predownloadComplete(PredownloadCompleteReason.CANNOT_CONNECT_TO_DEEPSTORE, clusterName, instanceId,
"");
StatusRecorder.predownloadComplete(PredownloadCompleteReason.CANNOT_CONNECT_TO_DEEPSTORE, _clusterName,
_instanceId, "");
}
}

Expand All @@ -228,15 +224,15 @@ public PredownloadCompleteReason downloadSegments() {
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Submit tasks to the executor
for (SegmentInfo segmentInfo : segmentInfoList) {
for (SegmentInfo segmentInfo : _segmentInfoList) {
if (segmentInfo.isDownloaded()) {
numOfSkippedSegments++;
_numOfSkippedSegments++;
continue;
} else if (!segmentInfo.canBeDownloaded()) {
numOfUnableToDownloadSegments++;
_numOfUnableToDownloadSegments++;
continue;
} else {
numOfDownloadSegments++;
_numOfDownloadSegments++;
}
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Expand All @@ -245,7 +241,7 @@ public PredownloadCompleteReason downloadSegments() {
LOGGER.error("Failed to download segment: {} of table: {} with issue ", segmentInfo.getSegmentName(),
segmentInfo.getTableNameWithType(), e);
}
}, executor);
}, _executor);

// TODO: add future.orTimeout() to handle per segment downloading timeout
// Right now not able to use due to monorepo incapability with JAVA9+ syntax
Expand All @@ -268,7 +264,7 @@ public PredownloadCompleteReason downloadSegments() {
}
long timeTaken = System.currentTimeMillis() - startTime;
LOGGER.info("Download segments from deep store took {} sec", timeTaken / 1000);
return failedSegments.isEmpty() ? PredownloadCompleteReason.ALL_SEGMENTS_DOWNLOADED
return _failedSegments.isEmpty() ? PredownloadCompleteReason.ALL_SEGMENTS_DOWNLOADED
: PredownloadCompleteReason.SOME_SEGMENTS_DOWNLOAD_FAILED;
}

Expand All @@ -277,11 +273,11 @@ void downloadSegment(SegmentInfo segmentInfo)
try {
long startTime = System.currentTimeMillis();
File tempRootDir = getTmpSegmentDataDir(segmentInfo);
if (instanceDataManagerConfig.isStreamSegmentDownloadUntar() && segmentInfo.getCrypterName() == null) {
if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar() && segmentInfo.getCrypterName() == null) {
try {
// TODO: increase rate limit here
File untaredSegDir = downloadAndStreamUntarWithRateLimit(segmentInfo, tempRootDir,
instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
moveSegment(segmentInfo, untaredSegDir);
} finally {
FileUtils.deleteQuietly(tempRootDir);
Expand All @@ -294,24 +290,24 @@ void downloadSegment(SegmentInfo segmentInfo)
FileUtils.deleteQuietly(tempRootDir);
}
}
failedSegments.remove(segmentInfo.getSegmentName());
TableInfo tableInfo = tableInfoMap.get(segmentInfo.getTableNameWithType());
_failedSegments.remove(segmentInfo.getSegmentName());
TableInfo tableInfo = _tableInfoMap.get(segmentInfo.getTableNameWithType());
if (tableInfo != null) {
tableInfo.loadSegmentFromLocal(segmentInfo, instanceDataManagerConfig);
tableInfo.loadSegmentFromLocal(segmentInfo, _instanceDataManagerConfig);
}
totalDownloadedSizeBytes += segmentInfo.getLocalSizeBytes();
predownloadMetrics.segmentDownloaded(true, segmentInfo.getSegmentName(), segmentInfo.getLocalSizeBytes(),
_totalDownloadedSizeBytes += segmentInfo.getLocalSizeBytes();
_predownloadMetrics.segmentDownloaded(true, segmentInfo.getSegmentName(), segmentInfo.getLocalSizeBytes(),
System.currentTimeMillis() - startTime);
} catch (Exception e) {
failedSegments.add(segmentInfo.getSegmentName());
predownloadMetrics.segmentDownloaded(false, segmentInfo.getSegmentName(), 0, 0);
_failedSegments.add(segmentInfo.getSegmentName());
_predownloadMetrics.segmentDownloaded(false, segmentInfo.getSegmentName(), 0, 0);
throw e;
}
}

private File getTmpSegmentDataDir(SegmentInfo segmentInfo)
throws Exception {
TableInfo tableInfo = tableInfoMap.get(segmentInfo.getTableNameWithType());
TableInfo tableInfo = _tableInfoMap.get(segmentInfo.getTableNameWithType());
if (tableInfo == null) {
throw new PredownloadException("Table info not found for segment: " + segmentInfo.getSegmentName());
}
Expand Down Expand Up @@ -374,7 +370,7 @@ File downloadAndDecrypt(SegmentInfo segmentInfo, File tempRootDir)
private File moveSegment(SegmentInfo segmentInfo, File untaredSegDir)
throws IOException {
try {
File indexDir = segmentInfo.getSegmentDataDir(tableInfoMap.get(segmentInfo.getTableNameWithType()));
File indexDir = segmentInfo.getSegmentDataDir(_tableInfoMap.get(segmentInfo.getTableNameWithType()));
FileUtils.deleteDirectory(indexDir);
FileUtils.moveDirectory(untaredSegDir, indexDir);
return indexDir;
Expand All @@ -396,7 +392,7 @@ File untarAndMoveSegment(SegmentInfo segmentInfo, File tarFile, File tempRootDir
File untaredSegDir = TarCompressionUtils.untar(tarFile, untarDir).get(0);
LOGGER.info("Uncompressed tar file: {} into target dir: {}", tarFile, untarDir);
// Replace the existing index directory.
File indexDir = segmentInfo.getSegmentDataDir(tableInfoMap.get(segmentInfo.getTableNameWithType()));
File indexDir = segmentInfo.getSegmentDataDir(_tableInfoMap.get(segmentInfo.getTableNameWithType()));
FileUtils.deleteDirectory(indexDir);
FileUtils.moveDirectory(untaredSegDir, indexDir);
LOGGER.info("Successfully downloaded segment: {} of table: {} to index dir: {}", segmentName, tableNameWithType,
Expand Down
Loading

0 comments on commit a4235ff

Please sign in to comment.