diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 0ed0b69821..bae0174b3b 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -19,6 +19,7 @@ package org.apache.tez.client; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -44,6 +45,8 @@ import com.google.common.base.Strings; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.tez.common.JavaOptsChecker; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; @@ -118,34 +121,28 @@ @Private public final class TezClientUtils { - private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(TezClientUtils.class); private static final int UTF8_CHUNK_SIZE = 16 * 1024; private TezClientUtils() {} - private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws - IOException { - URI uri; + private static RemoteIterator getListFilesFileStatus(String configUri, Configuration conf) + throws IOException { + Path p = getPath(configUri); + FileSystem fs = p.getFileSystem(conf); + p = fs.resolvePath(p.makeQualified(fs.getUri(), fs.getWorkingDirectory())); + FileSystem targetFS = p.getFileSystem(conf); + return targetFS.listFiles(p, false); + } + + private static Path getPath(String configUri) { try { - uri = new URI(fileName); + return new Path(new URI(configUri)); } catch (URISyntaxException e) { - String message = "Invalid URI defined in configuration for" - + " location of TEZ jars. providedURI=" + fileName; + String message = "Invalid URI defined in configuration for" + " location of TEZ jars. providedURI=" + configUri; LOG.error(message); throw new TezUncheckedException(message, e); } - - Path p = new Path(uri); - FileSystem fs = p.getFileSystem(conf); - p = fs.resolvePath(p.makeQualified(fs.getUri(), - fs.getWorkingDirectory())); - FileSystem targetFS = p.getFileSystem(conf); - if (targetFS.isDirectory(p)) { - return targetFS.listStatus(p); - } else { - FileStatus fStatus = targetFS.getFileStatus(p); - return new FileStatus[]{fStatus}; - } } /** @@ -233,15 +230,11 @@ private static boolean addLocalResources(Configuration conf, } else { type = LocalResourceType.FILE; } + RemoteIterator fileStatuses = getListFilesFileStatus(configUri, conf); - FileStatus [] fileStatuses = getLRFileStatus(configUri, conf); - - for (FileStatus fStatus : fileStatuses) { + while (fileStatuses.hasNext()) { + LocatedFileStatus fStatus = fileStatuses.next(); String linkName; - if (fStatus.isDirectory()) { - // Skip directories - no recursive search support. - continue; - } // If the resource is an archive, we've already done this work if(type != LocalResourceType.ARCHIVE) { u = fStatus.getPath().toUri(); @@ -250,8 +243,7 @@ private static boolean addLocalResources(Configuration conf, p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory())); if(null != u.getFragment()) { - LOG.warn("Fragment set for link being interpreted as a file," + - "URI: " + u.toString()); + LOG.warn("Fragment set for link being interpreted as a file, URI: {}", u); } } @@ -336,8 +328,13 @@ public static FileSystem ensureStagingDirExists(Configuration conf, UserGroupInformation ugi = UserGroupInformation.getLoginUser(); realUser = ugi.getShortUserName(); currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - if (fs.exists(stagingArea)) { - FileStatus fsStatus = fs.getFileStatus(stagingArea); + FileStatus fsStatus = null; + try { + fsStatus = fs.getFileStatus(stagingArea); + } catch (FileNotFoundException fnf) { + // Ignore + } + if (fsStatus != null) { String owner = fsStatus.getOwner(); if (!(owner.equals(currentUser) || owner.equals(realUser))) { throw new IOException("The ownership on the staging directory " diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java index b6e942cdae..656f38fb10 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.util.ArrayList; @@ -383,18 +384,18 @@ public static List readRecoveryEvents(TezConfiguration tezConf, Ap new Path(currentAttemptRecoveryDataDir, appId.toString().replace( "application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); - if (fs.exists(recoveryFilePath)) { - LOG.info("Read recovery file:" + recoveryFilePath); - FSDataInputStream in = null; - try { - in = fs.open(recoveryFilePath); - historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in)); - } catch (IOException e) { - throw e; - } finally { - if (in != null) { - in.close(); - } + LOG.info("Read recovery file:" + recoveryFilePath); + FSDataInputStream in = null; + try { + in = fs.open(recoveryFilePath); + historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(in)); + } catch (FileNotFoundException fnf) { + // Ignore, the file doesn't exist + } catch (IOException e) { + throw e; + } finally { + if (in != null) { + in.close(); } } } @@ -429,12 +430,13 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) { return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir); } - private FSDataInputStream getSummaryStream(Path summaryPath) - throws IOException { - if (!recoveryFS.exists(summaryPath)) { + private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException { + try { + return recoveryFS.open(summaryPath, recoveryBufferSize); + } catch (FileNotFoundException fnf) { return null; + } - return recoveryFS.open(summaryPath, recoveryBufferSize); } private Path getDAGRecoveryFilePath(Path recoveryDataDir, @@ -741,12 +743,14 @@ public DAGRecoveryData parseRecoveryData() throws IOException { + lastRecoveryFile); break; } - FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile); lastRecoveryFile = dagRecoveryFile; - LOG.info("Trying to recover dag from recovery file" - + ", dagId=" + lastInProgressDAG.toString() - + ", dagRecoveryFile=" + dagRecoveryFile - + ", len=" + fileStatus.getLen()); + LOG.info("Trying to recover dag from recovery file, dagId={}, dagRecoveryFile={}", lastInProgressDAG, + dagRecoveryFile); + if (LOG.isDebugEnabled()) { + FileStatus fileStatus = recoveryFS.getFileStatus(dagRecoveryFile); + LOG.debug("Recovery file details: {}", fileStatus); + } + FSDataInputStream dagRecoveryStream = recoveryFS.open(dagRecoveryFile, recoveryBufferSize); CodedInputStream codedInputStream = CodedInputStream.newInstance(dagRecoveryStream); codedInputStream.setSizeLimit(Integer.MAX_VALUE);