Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8790] Fix most blocker/high sonarqube reported reliability issues #12536

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ public List<Partition> getAllPartitions(String tableName) {
}

return partitions;
} catch (InterruptedException e) {
throw new RuntimeException("Glue sync interrupted", e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e);
} finally {
Expand Down Expand Up @@ -256,6 +258,8 @@ public List<Partition> getPartitionsFromList(String tableName, List<String> part
partitionList.size() - partitions.size());

return partitions;
} catch (InterruptedException e) {
throw new RuntimeException("Glue Sync interrupted", e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(this.databaseName, tableName), e);
} finally {
Expand All @@ -274,13 +278,12 @@ private List<Partition> getChangedPartitions(List<String> changedPartitions, Str
.partitionsToGet(partitionValueList)
.build();
BatchGetPartitionResponse callResult = awsGlue.batchGetPartition(request).get();
List<Partition> result = callResult

return callResult
.partitions()
.stream()
.map(p -> new Partition(p.values(), p.storageDescriptor().location()))
.collect(Collectors.toList());

return result;
}

@Override
Expand Down Expand Up @@ -310,6 +313,8 @@ private <T> void parallelizeChange(List<T> items, int parallelism, Consumer<List
for (Future<?> future : futures) {
future.get();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to parallelize operation", e);
} finally {
Expand Down Expand Up @@ -713,7 +718,7 @@ public void managePartitionIndexes(String tableName) throws ExecutionException,
boolean indexesChanges = false;
for (PartitionIndexDescriptor existingIdx: existingIdxsResp.partitionIndexDescriptorList()) {
List<String> idxColumns = existingIdx.keys().stream().map(key -> key.name()).collect(Collectors.toList());
Boolean toBeRemoved = true;
boolean toBeRemoved = true;
for (List<String> neededIdx : partitionsIndexNeeded) {
if (neededIdx.equals(idxColumns)) {
toBeRemoved = false;
Expand All @@ -733,7 +738,7 @@ public void managePartitionIndexes(String tableName) throws ExecutionException,

// for each needed index create if not exist
for (List<String> neededIdx : partitionsIndexNeeded) {
Boolean toBeCreated = true;
boolean toBeCreated = true;
for (PartitionIndexDescriptor existingIdx: existingIdxsResp.partitionIndexDescriptorList()) {
List<String> collect = existingIdx.keys().stream().map(key -> key.name()).collect(Collectors.toList());
if (collect.equals(neededIdx)) {
Expand Down Expand Up @@ -920,6 +925,8 @@ public void updateLastCommitTimeSynced(String tableName) {
// people may wan't to add indexes, without re-creating the table
// therefore we call this at each commit as a workaround
managePartitionIndexes(tableName);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.warn("An indexation process is currently running.", e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
Expand All @@ -58,7 +59,7 @@ public abstract class BaseZookeeperBasedLockProvider implements LockProvider<Int
private static final Logger LOG = LoggerFactory.getLogger(BaseZookeeperBasedLockProvider.class);

private final transient CuratorFramework curatorFrameworkClient;
private volatile InterProcessMutex lock = null;
private AtomicReference<InterProcessMutex> lockRef = null;
protected final LockConfiguration lockConfiguration;
protected final String zkBasePath;
protected final String lockKey;
Expand Down Expand Up @@ -136,18 +137,18 @@ public boolean tryLock(long time, TimeUnit unit) {
} catch (Exception e) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()), e);
}
return lock != null && lock.isAcquiredInThisProcess();
return lockRef.get() != null && lockRef.get().isAcquiredInThisProcess();
}

@Override
public void unlock() {
try {
LOG.info(generateLogStatement(LockState.RELEASING, generateLogSuffixString()));
if (lock == null || !lock.isAcquiredInThisProcess()) {
if (lockRef.get() == null || !lockRef.get().isAcquiredInThisProcess()) {
return;
}
lock.release();
lock = null;
lockRef.get().release();
lockRef.set(null);
LOG.info(generateLogStatement(LockState.RELEASED, generateLogSuffixString()));
} catch (Exception e) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()), e);
Expand All @@ -157,9 +158,9 @@ public void unlock() {
@Override
public void close() {
try {
if (lock != null) {
lock.release();
lock = null;
if (lockRef.get() != null) {
lockRef.get().release();
lockRef.set(null);
}
this.curatorFrameworkClient.close();
} catch (Exception e) {
Expand All @@ -169,19 +170,19 @@ public void close() {

@Override
public InterProcessMutex getLock() {
return this.lock;
return this.lockRef.get();
}

private void acquireLock(long time, TimeUnit unit) throws Exception {
ValidationUtils.checkArgument(this.lock == null, generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
ValidationUtils.checkArgument(this.lockRef.get() == null, generateLogStatement(LockState.ALREADY_ACQUIRED, generateLogSuffixString()));
InterProcessMutex newLock = new InterProcessMutex(
this.curatorFrameworkClient, getLockPath());
boolean acquired = newLock.acquire(time, unit);
if (!acquired) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()));
}
if (newLock.isAcquiredInThisProcess()) {
lock = newLock;
lockRef.set(newLock);
} else {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,11 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry)));
return partitionToRollbackStats.stream();
} else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
HoodieLogFormat.Writer writer = null;
final StoragePath filePath;
try {
String fileId = rollbackRequest.getFileId();
HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();
String fileId = rollbackRequest.getFileId();
HoodieTableVersion tableVersion = metaClient.getTableConfig().getTableVersion();

writer = HoodieLogFormat.newWriterBuilder()
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(), rollbackRequest.getPartitionPath()))
.withFileId(fileId)
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
Expand All @@ -140,8 +138,7 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();) {
// generate metadata
if (doDelete) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = generateHeader(instantToRollback.requestedTime());
Expand All @@ -153,14 +150,6 @@ List<Pair<String, HoodieRollbackStat>> maybeDeleteAndCollectStats(HoodieEngineCo
}
} catch (IOException | InterruptedException io) {
throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io);
} finally {
try {
if (writer != null) {
writer.close();
}
} catch (IOException io) {
throw new HoodieIOException("Error appending rollback block", io);
}
}

// This step is intentionally done after writer is closed. Guarantees that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final class RocksDbDiskMap<T extends Serializable, R extends Serializable
private static final Logger LOG = LoggerFactory.getLogger(RocksDbDiskMap.class);
// Stores the key and corresponding value's latest metadata spilled to disk
private final Set<T> keySet;
private RocksDBDAO rocksDb;
private volatile RocksDBDAO rocksDb;

public RocksDbDiskMap(String rocksDbStoragePath) throws IOException {
super(rocksDbStoragePath, ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ public class PushGatewayMetricsReporter extends MetricsReporter {

private final PushGatewayReporter pushGatewayReporter;
private final int periodSeconds;
private final boolean deleteShutdown;
private final String configuredJobName;
private final Map<String, String> configuredLabels;
private final boolean randomSuffix;
private final Random random = new Random();

public PushGatewayMetricsReporter(HoodieMetricsConfig metricsConfig, MetricRegistry registry) {

String serverHost = metricsConfig.getPushGatewayHost();
int serverPort = metricsConfig.getPushGatewayPort();
periodSeconds = metricsConfig.getPushGatewayReportPeriodSeconds();
deleteShutdown = metricsConfig.getPushGatewayDeleteOnShutdown();
boolean deleteShutdown = metricsConfig.getPushGatewayDeleteOnShutdown();
configuredJobName = metricsConfig.getPushGatewayJobName();
configuredLabels = Collections.unmodifiableMap(parseLabels(metricsConfig.getPushGatewayLabels()));
randomSuffix = metricsConfig.getPushGatewayRandomJobNameSuffix();
Expand Down Expand Up @@ -83,7 +83,6 @@ public Map<String, String> getLabels() {

private String getJobName() {
if (randomSuffix) {
Random random = new Random();
return configuredJobName + random.nextLong();
}
return configuredJobName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public class HBaseHFileBootstrapIndexReader extends BootstrapIndex.IndexReader {
private final String indexByFileIdPath;

// Index Readers
private transient HFile.Reader indexByPartitionReader;
private transient HFile.Reader indexByFileIdReader;
private transient volatile HFile.Reader indexByPartitionReader;
private transient volatile HFile.Reader indexByFileIdReader;

// Bootstrap Index Info
private transient HoodieBootstrapIndexInfo bootstrapIndexInfo;
Expand Down Expand Up @@ -139,7 +139,7 @@ private HFile.Reader partitionIndexReader() {
if (null == indexByPartitionReader) {
synchronized (this) {
if (null == indexByPartitionReader) {
LOG.info("Opening partition index :" + indexByPartitionPath);
LOG.info("Opening partition index :{}", indexByPartitionPath);
this.indexByPartitionReader = createReader(
indexByPartitionPath, metaClient.getStorageConf().unwrapAs(Configuration.class), (FileSystem) metaClient.getStorage().getFileSystem());
}
Expand All @@ -152,7 +152,7 @@ private HFile.Reader fileIdIndexReader() {
if (null == indexByFileIdReader) {
synchronized (this) {
if (null == indexByFileIdReader) {
LOG.info("Opening fileId index :" + indexByFileIdPath);
LOG.info("Opening fileId index :{}", indexByFileIdPath);
this.indexByFileIdReader = createReader(
indexByFileIdPath, metaClient.getStorageConf().unwrapAs(Configuration.class), (FileSystem) metaClient.getStorage().getFileSystem());
}
Expand Down Expand Up @@ -204,7 +204,7 @@ public List<BootstrapFileMapping> getSourceFileMappingForPartition(String partit
.map(e -> new BootstrapFileMapping(bootstrapBasePath, metadata.getBootstrapPartitionPath(),
partition, e.getValue(), e.getKey())).collect(Collectors.toList());
} else {
LOG.warn("No value found for partition key (" + partition + ")");
LOG.warn("No value found for partition key ({})", partition);
return new ArrayList<>();
}
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class RetryHelper<T, R extends Exception> implements Serializable {
private final long initialIntervalTime;
private String taskInfo = "N/A";
private List<? extends Class<? extends Exception>> retryExceptionsClasses;
private final Random random = new Random();

public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs,
List<Class<? extends Exception>> retryExceptions, String taskInfo) {
Expand Down Expand Up @@ -134,7 +135,6 @@ private boolean checkIfExceptionInRetryList(Exception e) {
}

private long getWaitTimeExp(int retryCount) {
Random random = new Random();
if (0 == retryCount) {
return initialIntervalTime;
}
Expand Down
77 changes: 33 additions & 44 deletions hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ public abstract List<StoragePathInfo> listDirectEntries(StoragePath path,
* Sets Modification Time for the storage Path
* @param path
* @param modificationTimeInMillisEpoch Millis since Epoch
* @throws IOException
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract void setModificationTime(StoragePath path, long modificationTimeInMillisEpoch) throws IOException;
Expand All @@ -247,7 +246,7 @@ public abstract List<StoragePathInfo> globEntries(StoragePath pathPattern,
*
* @param oldPath source path.
* @param newPath destination path.
* @return {@true} if rename is successful.
* @return true if rename is successful.
* @throws IOException IO error.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Expand Down Expand Up @@ -329,58 +328,48 @@ public final void createImmutableFileInPath(StoragePath path,
public final void createImmutableFileInPath(StoragePath path,
Option<byte[]> content,
boolean needTempFile) throws HoodieIOException {
OutputStream fsout = null;
StoragePath tmpPath = null;

try {
if (!content.isPresent()) {
fsout = create(path, false);
}

if (content.isPresent() && needTempFile) {
StoragePath parent = path.getParent();
tmpPath = new StoragePath(parent, path.getName() + "." + UUID.randomUUID());
fsout = create(tmpPath, false);
fsout.write(content.get());
}
boolean usedTmpPath = false;
StoragePath createPath;
if (content.isPresent() && needTempFile) {
StoragePath parent = path.getParent();
createPath = new StoragePath(parent, path.getName() + "." + UUID.randomUUID());
usedTmpPath = true;
} else {
createPath = path;
}

if (content.isPresent() && !needTempFile) {
fsout = create(path, false);
fsout.write(content.get());
try (OutputStream outputStream = create(createPath, false)) {
if (content.isPresent()) {
outputStream.write(content.get());
}
} catch (IOException e) {
String errorMsg = "Failed to create file " + (tmpPath != null ? tmpPath : path);
String errorMsg = "Failed to create/write file " + createPath;
throw new HoodieIOException(errorMsg, e);
} finally {
try {
if (null != fsout) {
fsout.close();
}
} catch (IOException e) {
String errorMsg = "Failed to close file " + (needTempFile ? tmpPath : path);
throw new HoodieIOException(errorMsg, e);
}
}

// do some renaming and cleanup if we used a temp file
if (usedTmpPath) {
boolean renameSuccess = false;
IOException renameErr = null;
try {
if (null != tmpPath) {
renameSuccess = rename(tmpPath, path);
}
renameSuccess = rename(createPath, path);
} catch (IOException e) {
throw new HoodieIOException(
"Failed to rename " + tmpPath + " to the target " + path,
e);
} finally {
if (!renameSuccess && null != tmpPath) {
try {
deleteFile(tmpPath);
LOG.warn("Fail to rename " + tmpPath + " to " + path
+ ", target file exists: " + exists(path));
} catch (IOException e) {
throw new HoodieIOException("Failed to delete tmp file " + tmpPath, e);
}
renameErr = e;
}

if (!renameSuccess) {
try {
deleteFile(createPath);
LOG.warn("Fail to rename {} to {}, target file exists: {}", usedTmpPath, path, exists(path));
} catch (IOException e) {
throw new HoodieIOException("Failed to delete tmp file upon rename failure. " + usedTmpPath, e);
}
}

if (renameErr != null) {
throw new HoodieIOException("Failed to rename " + usedTmpPath + " to the target " + path,
renameErr);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class KafkaConnectControlAgent implements KafkaControlAgent {
private static final long KAFKA_POLL_TIMEOUT_MS = 100;
private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;

private static KafkaConnectControlAgent agent;
private static volatile KafkaConnectControlAgent agent;
private final String bootstrapServers;
private final String controlTopicName;
private final ExecutorService executorService;
Expand Down
Loading
Loading