Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4514: Reduce Some FileSystem Calls. #309

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.client;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
Expand All @@ -44,6 +45,8 @@
import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.tez.common.JavaOptsChecker;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
Expand Down Expand Up @@ -118,34 +121,28 @@
@Private
public final class TezClientUtils {

private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(TezClientUtils.class);
private static final int UTF8_CHUNK_SIZE = 16 * 1024;

private TezClientUtils() {}

private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws
IOException {
URI uri;
private static RemoteIterator<LocatedFileStatus> getListFilesFileStatus(String configUri, Configuration conf)
throws IOException {
Path p = getPath(configUri);
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
return targetFS.listFiles(p, false);
Copy link
Contributor

@abstractdog abstractdog Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I can understand, this single listFiles call can be used instead of a "directory or file" check, making this method simpler, looks good

}

private static Path getPath(String configUri) {
try {
uri = new URI(fileName);
return new Path(new URI(configUri));
} catch (URISyntaxException e) {
String message = "Invalid URI defined in configuration for"
+ " location of TEZ jars. providedURI=" + fileName;
String message = "Invalid URI defined in configuration for" + " location of TEZ jars. providedURI=" + configUri;
LOG.error(message);
throw new TezUncheckedException(message, e);
}

Path p = new Path(uri);
FileSystem fs = p.getFileSystem(conf);
p = fs.resolvePath(p.makeQualified(fs.getUri(),
fs.getWorkingDirectory()));
FileSystem targetFS = p.getFileSystem(conf);
if (targetFS.isDirectory(p)) {
return targetFS.listStatus(p);
} else {
FileStatus fStatus = targetFS.getFileStatus(p);
return new FileStatus[]{fStatus};
}
}

/**
Expand Down Expand Up @@ -233,15 +230,11 @@ private static boolean addLocalResources(Configuration conf,
} else {
type = LocalResourceType.FILE;
}
RemoteIterator<LocatedFileStatus> fileStatuses = getListFilesFileStatus(configUri, conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getListFilesFileStatus receives a "String fileName" param, and here we pass a "configUri", can you unify and use whatever is closer to the truth? also I can see that getListFilesFileStatus creates an URI eventually, we can pass it here, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done thing, the name is URI, but it isn't a URI object but string, it is extracted from a conf which has name URI, so kept the name old as configURI


FileStatus [] fileStatuses = getLRFileStatus(configUri, conf);

for (FileStatus fStatus : fileStatuses) {
while (fileStatuses.hasNext()) {
LocatedFileStatus fStatus = fileStatuses.next();
String linkName;
if (fStatus.isDirectory()) {
// Skip directories - no recursive search support.
continue;
}
// If the resource is an archive, we've already done this work
if(type != LocalResourceType.ARCHIVE) {
u = fStatus.getPath().toUri();
Expand All @@ -250,8 +243,7 @@ private static boolean addLocalResources(Configuration conf,
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory()));
if(null != u.getFragment()) {
LOG.warn("Fragment set for link being interpreted as a file," +
"URI: " + u.toString());
LOG.warn("Fragment set for link being interpreted as a file, URI: {}", u);
}
}

Expand Down Expand Up @@ -336,8 +328,13 @@ public static FileSystem ensureStagingDirExists(Configuration conf,
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
if (fs.exists(stagingArea)) {
FileStatus fsStatus = fs.getFileStatus(stagingArea);
FileStatus fsStatus = null;
try {
fsStatus = fs.getFileStatus(stagingArea);
} catch (FileNotFoundException fnf) {
// Ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about returning if

if (fsStatus == null) {
  return fs;
}

and having the rest of the method unindented

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't return, there is an else block below if fsStatus is null

else {
      TezCommonUtils.mkDirForAM(fs, stagingArea);
    }

}
if (fsStatus != null) {
String owner = fsStatus.getOwner();
if (!(owner.equals(currentUser) || owner.equals(realUser))) {
throw new IOException("The ownership on the staging directory "
Expand Down
46 changes: 25 additions & 21 deletions tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.dag.app;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -383,18 +384,18 @@ public static List<HistoryEvent> readRecoveryEvents(TezConfiguration tezConf, Ap
new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
"application", "dag")
+ "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
if (fs.exists(recoveryFilePath)) {
LOG.info("Read recovery file:" + recoveryFilePath);
FSDataInputStream in = null;
try {
in = fs.open(recoveryFilePath);
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
} catch (IOException e) {
throw e;
} finally {
if (in != null) {
in.close();
}
LOG.info("Read recovery file:" + recoveryFilePath);
FSDataInputStream in = null;
try {
in = fs.open(recoveryFilePath);
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in));
} catch (FileNotFoundException fnf) {
// Ignore, the file doesn't exist
} catch (IOException e) {
throw e;
} finally {
if (in != null) {
in.close();
}
}
}
Expand Down Expand Up @@ -429,12 +430,13 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) {
return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir);
}

private FSDataInputStream getSummaryStream(Path summaryPath)
throws IOException {
if (!recoveryFS.exists(summaryPath)) {
private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException {
try {
return recoveryFS.open(summaryPath, recoveryBufferSize);
} catch (FileNotFoundException fnf) {
return null;

}
return recoveryFS.open(summaryPath, recoveryBufferSize);
}

private Path getDAGRecoveryFilePath(Path recoveryDataDir,
Expand Down Expand Up @@ -741,12 +743,14 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
+ lastRecoveryFile);
break;
}
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
lastRecoveryFile = dagRecoveryFile;
LOG.info("Trying to recover dag from recovery file"
+ ", dagId=" + lastInProgressDAG.toString()
+ ", dagRecoveryFile=" + dagRecoveryFile
+ ", len=" + fileStatus.getLen());
LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed fileStatus.getLen() from the log message, is it intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it was shooting an RPC just for log and file length, so removed it

Copy link
Contributor

@abstractdog abstractdog Oct 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we about leaving the useful info on DEBUG level, but in that case, we can log the full FileStatus, like

LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG, 
    dagRecoveryFile);
if (LOG.isDebugEnabled()) {
    // extra RPC call
    FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
    LOG.debug("Recovery file details: {}", fileStatus);
}

dagRecoveryFile);
if (LOG.isDebugEnabled()) {
FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile);
LOG.debug("Recovery file details: {}", fileStatus);
}

FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize);
CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream);
codedInputStream.setSizeLimit(Integer.MAX_VALUE);
Expand Down