Skip to content

Commit

Permalink
TEZ-4514: Reduce Some FileSystem Calls. (#309) (Ayush Saxena reviewed…
Browse files Browse the repository at this point in the history
… by Laszlo Bodor)
  • Loading branch information
ayushtkn authored Oct 3, 2023
1 parent 7855c1f commit 2ad10b6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 51 deletions.
57 changes: 27 additions & 30 deletions tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.client;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
Expand All @@ -44,6 +45,8 @@
import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
Expand Down Expand Up @@ -118,34 +121,28 @@
@Private
public final class TezClientUtils {

private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(TezClientUtils.class);
private static final int UTF8_CHUNK_SIZE = 16 * 1024;

private TezClientUtils() {}

private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws
IOException {
URI uri;
private static RemoteIterator<LocatedFileStatus> getListFilesFileStatus(String configUri, Configuration conf)
throws IOException {
Path p = getPath(configUri);
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
return targetFS.listFiles(p, false);
}

private static Path getPath(String configUri) {
try {
uri = new URI(fileName);
return new Path(new URI(configUri));
} catch (URISyntaxException e) {
String message = "Invalid URI defined in configuration for"
+ " location of TEZ jars. providedURI=" + fileName;
String message = "Invalid URI defined in configuration for" + " location of TEZ jars. providedURI=" + configUri;
LOG.error(message);
throw new TezUncheckedException(message, e);
}

Path p = new Path(uri);
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(),
fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
if (targetFS.isDirectory(p)) {
return targetFS.listStatus(p);
} else {
FileStatus fStatus = targetFS.getFileStatus(p);
return new FileStatus[]{fStatus};
}
}

/**
Expand Down Expand Up @@ -233,15 +230,11 @@ private static boolean addLocalResources(Configuration conf,
} else {
type = LocalResourceType.FILE;
}
RemoteIterator<LocatedFileStatus> fileStatuses = getListFilesFileStatus(configUri, conf);

FileStatus [] fileStatuses = getLRFileStatus(configUri, conf);

for (FileStatus fStatus : fileStatuses) {
while (fileStatuses.hasNext()) {
LocatedFileStatus fStatus = fileStatuses.next();
String linkName;
if (fStatus.isDirectory()) {
// Skip directories - no recursive search support.
continue;
}
// If the resource is an archive, we've already done this work
if(type != LocalResourceType.ARCHIVE) {
u = fStatus.getPath().toUri();
Expand All @@ -250,8 +243,7 @@ private static boolean addLocalResources(Configuration conf,
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
if(null != u.getFragment()) {
LOG.warn("Fragment set for link being interpreted as a file," +
"URI: " + u.toString());
LOG.warn("Fragment set for link being interpreted as a file, URI: {}", u);
}
}

Expand Down Expand Up @@ -336,8 +328,13 @@ public static FileSystem ensureStagingDirExists(Configuration conf,
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
if (fs.exists(stagingArea)) {
FileStatus fsStatus = fs.getFileStatus(stagingArea);
FileStatus fsStatus = null;
try {
fsStatus = fs.getFileStatus(stagingArea);
} catch (FileNotFoundException fnf) {
// Ignore
}
if (fsStatus != null) {
String owner = fsStatus.getOwner();
if (!(owner.equals(currentUser) || owner.equals(realUser))) {
throw new IOException("The ownership on the staging directory "
Expand Down
46 changes: 25 additions & 21 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.dag.app;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -383,18 +384,18 @@ public static List<HistoryEvent> readRecoveryEvents(TezConfiguration tezConf, Ap
new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
"application", "dag")
+ "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
if (fs.exists(recoveryFilePath)) {
LOG.info("Read recovery file:" + recoveryFilePath);
FSDataInputStream in = null;
try {
in = fs.open(recoveryFilePath);
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
} catch (IOException e) {
throw e;
} finally {
if (in != null) {
in.close();
}
LOG.info("Read recovery file:" + recoveryFilePath);
FSDataInputStream in = null;
try {
in = fs.open(recoveryFilePath);
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
} catch (FileNotFoundException fnf) {
// Ignore, the file doesn't exist
} catch (IOException e) {
throw e;
} finally {
if (in != null) {
in.close();
}
}
}
Expand Down Expand Up @@ -429,12 +430,13 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) {
return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir);
}

private FSDataInputStream getSummaryStream(Path summaryPath)
throws IOException {
if (!recoveryFS.exists(summaryPath)) {
private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException {
try {
return recoveryFS.open(summaryPath, recoveryBufferSize);
} catch (FileNotFoundException fnf) {
return null;

}
return recoveryFS.open(summaryPath, recoveryBufferSize);
}

private Path getDAGRecoveryFilePath(Path recoveryDataDir,
Expand Down Expand Up @@ -741,12 +743,14 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
+ lastRecoveryFile);
break;
}
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
lastRecoveryFile = dagRecoveryFile;
LOG.info("Trying to recover dag from recovery file"
+ ", dagId=" + lastInProgressDAG.toString()
+ ", dagRecoveryFile=" + dagRecoveryFile
+ ", len=" + fileStatus.getLen());
LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG,
dagRecoveryFile);
if (LOG.isDebugEnabled()) {
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
LOG.debug("Recovery file details: {}", fileStatus);
}

FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream);
codedInputStream.setSizeLimit(Integer.MAX_VALUE);
Expand Down

0 comments on commit 2ad10b6

Please sign in to comment.