Skip to content

Commit

Permalink
Merge branch 'apache:master' into TEZ-4550
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Aug 9, 2024
2 parents b7aa783 + 563b494 commit 84bb02e
Show file tree
Hide file tree
Showing 36 changed files with 482 additions and 139 deletions.
18 changes: 5 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@

<!--dependency versions in alphabetical order-->
<asynchttpclient.version>2.12.3</asynchttpclient.version>
<bouncycastle.version>1.70</bouncycastle.version>
<bouncycastle.version>1.78</bouncycastle.version>
<build-helper-maven-plugin.version>1.8</build-helper-maven-plugin.version>
<buildnumber-maven-plugin.version>1.1</buildnumber-maven-plugin.version>
<checkstyle.version>8.35</checkstyle.version>
<codehaus.mojo.version>1.3.2</codehaus.mojo.version>
<commons-cli.version>1.6.0</commons-cli.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-io.version>2.8.0</commons-io.version>
<commons-io.version>2.16.0</commons-io.version>
<commons-lang.version>2.6</commons-lang.version>
<clover.license>${user.home}/clover.license</clover.license>
<dependency-check-maven.version>1.3.6</dependency-check-maven.version>
Expand All @@ -90,7 +90,7 @@
<mockito-core.version>4.3.1</mockito-core.version>
<netty.version>4.1.100.Final</netty.version>
<pig.version>0.13.0</pig.version>
<protobuf.version>3.21.1</protobuf.version>
<protobuf.version>3.24.4</protobuf.version>
<protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
<protoc.path>${env.PROTOC_PATH}</protoc.path>
<restrict-imports.enforcer.version>2.0.0</restrict-imports.enforcer.version>
Expand Down Expand Up @@ -377,10 +377,6 @@
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand All @@ -392,10 +388,6 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -784,13 +776,13 @@
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<artifactId>bcprov-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion tez-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<artifactId>bcprov-jdk18on</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public abstract ApplicationId submitApplication(ApplicationSubmissionContext app

public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException;

public abstract String getAmHost();
public abstract int getAmPort();

public abstract boolean isRunning() throws IOException;

public TezAppMasterStatus getAMStatus(Configuration conf, ApplicationId appId,
Expand Down
8 changes: 8 additions & 0 deletions tez-api/src/main/java/org/apache/tez/client/TezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1286,4 +1286,12 @@ public static ApplicationId appIdfromString(String appIdStr) {
+ appIdStr, n);
}
}

public String getAmHost() {
return frameworkClient == null ? null : frameworkClient.getAmHost();
}

public int getAmPort() {
return frameworkClient == null ? -1 : frameworkClient.getAmPort();
}
}
15 changes: 15 additions & 0 deletions tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class TezYarnClient extends FrameworkClient {

private volatile boolean isRunning;

private String amHost;
private int amPort;

protected TezYarnClient(YarnClient yarnClient) {
this.yarnClient = yarnClient;
}
Expand Down Expand Up @@ -100,11 +103,23 @@ public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnEx
throw new ApplicationNotFoundException("YARN reports no state for application "
+ appId);
}
this.amHost = report.getHost();
this.amPort = report.getRpcPort();
return report;
}

@Override
public boolean isRunning() throws IOException {
return isRunning;
}

@Override
public String getAmHost() {
return amHost;
}

@Override
public int getAmPort() {
return amPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,18 @@ public enum DAGCounter {
* Number of container reuses during a DAG. This is incremented every time
* the containerReused callback is called in the TaskSchedulerContext.
*/
TOTAL_CONTAINER_REUSE_COUNT
TOTAL_CONTAINER_REUSE_COUNT,

/*
* Number of nodes to which task attempts were assigned in this DAG.
* Nodes are distinguished by the Yarn NodeId.getHost().
*/
NODE_USED_COUNT,

/*
* Total number of nodes visible to the task scheduler (regardless of
* task assignments). This is typically exposed by a resource manager
* client.
*/
NODE_TOTAL_COUNT
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,14 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (NoCurrentDAGException e) {
LOG.info("Got NoCurrentDAGException from AM, returning a failed DAG", e);
return dagLost();
if (conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
LOG.info("Got NoCurrentDAGException from AM, going on as recovery is enabled", e);
} else {
// if recovery is disabled, we're not expecting the DAG to be finished any time in the future
LOG.info("Got NoCurrentDAGException from AM, returning a failed DAG as recovery is disabled", e);
return dagLost();
}
} catch (TezException e) {
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ public interface OutputCommitterContext {
*/
public int getVertexIndex();

public int getDagIdentifier();

}
19 changes: 19 additions & 0 deletions tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,16 @@ protected DAGClientAMProtocolBlockingPB getProxy(Configuration conf, Application
}
return super.getProxy(conf, sessionAppId, ugi);
}

@Override
public String getAmHost() {
return "testhost";
}

@Override
public int getAmPort() {
return 1234;
}
}

TezClientForTest configureAndCreateTezClient() throws YarnException, IOException, ServiceException {
Expand Down Expand Up @@ -1005,4 +1015,13 @@ public void testYarnZkDeprecatedConf() {
//Test that Exception is not thrown by createFinalConfProtoForApp
TezClientUtils.createFinalConfProtoForApp(conf, null);
}

@Test
public void testGetAmHostAndPort() throws Exception {
final TezClientForTest client = configureAndCreateTezClient(new TezConfiguration());

// TezClient exposes AM host and port from the FrameworkClient (now it's a TezYarnClientForTest)
assertEquals("testhost", client.getAmHost());
assertEquals(1234, client.getAmPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
@Test
public void testDagClientReturnsFailedDAGOnNoCurrentDAGException() throws Exception {
TezConfiguration tezConf = new TezConfiguration();
tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);

try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -50,19 +53,19 @@
* // -m method fully qualified method name: 'ClassName.methodName'
* // -t profile different threads separately
* // -s simple class names instead of FQN
* // -o fmt[,fmt...] output format: summary|traces|flat|collapsed|svg|tree|jfr
* // -o fmt[,fmt...] output format: summary|traces|flat|collapsed|svg|tree|jfr|html
* // --width px SVG width pixels (integer)
* // --height px SVG frame height pixels (integer)
* // --minwidth px skip frames smaller than px (double)
* // --reverse generate stack-reversed FlameGraph / Call tree
* Example:
* - To collect 30 second CPU profile of current process (returns FlameGraph svg)
* - To collect 10 second CPU profile of current process (returns FlameGraph html)
* {@literal curl "http://localhost:10002/prof"}
* - To collect 1 minute CPU profile of current process and output in tree format (html)
* {@literal curl "http://localhost:10002/prof?output=tree&duration=60"}
* - To collect 30 second heap allocation profile of current process (returns FlameGraph svg)
* - To collect 10 second heap allocation profile of current process (returns FlameGraph html)
* {@literal curl "http://localhost:10002/prof?event=alloc"}
* - To collect lock contention profile of current process (returns FlameGraph svg)
* - To collect lock contention profile of current process (returns FlameGraph html)
* {@literal curl "http://localhost:10002/prof?event=lock"}
* Following event types are supported (default is 'cpu') (NOTE: not all OS'es support all events)
* // Perf events:
Expand Down Expand Up @@ -95,7 +98,6 @@ public class ProfileServlet extends HttpServlet {
private static final String CONTENT_TYPE_TEXT = "text/plain; charset=utf-8";
private static final String ASYNC_PROFILER_HOME_ENV = "ASYNC_PROFILER_HOME";
private static final String ASYNC_PROFILER_HOME_SYSTEM_PROPERTY = "async.profiler.home";
private static final String PROFILER_SCRIPT = "/profiler.sh";
private static final int DEFAULT_DURATION_SECONDS = 10;
private static final AtomicInteger ID_GEN = new AtomicInteger(0);
public static final String OUTPUT_DIR = System.getProperty("java.io.tmpdir") + "/prof-output";
Expand Down Expand Up @@ -140,7 +142,7 @@ public static Event fromInternalName(final String name) {
}

enum Output {
SUMMARY, TRACES, FLAT, COLLAPSED, SVG, TREE, JFR
SUMMARY, TRACES, FLAT, COLLAPSED, SVG, TREE, JFR, HTML
}

private final Lock profilerLock = new ReentrantLock();
Expand Down Expand Up @@ -216,7 +218,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
+ (method == null ? event.name().toLowerCase() : method) + "-" + ID_GEN.incrementAndGet() + "."
+ output.name().toLowerCase());
List<String> cmd = new ArrayList<>();
cmd.add(asyncProfilerHome + PROFILER_SCRIPT);
cmd.add(getProfilerScriptPath());
cmd.add("-e");
cmd.add(method == null ? event.getInternalName() : method);
cmd.add("-d");
Expand Down Expand Up @@ -298,6 +300,16 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
out.close();
}

/**
* Get the path of the profiler script to be executed.
* Before async-profiler 3.0, the script was named profiler.sh, and after 3.0 it's bin/asprof
* @return
*/
private String getProfilerScriptPath() {
Path defaultPath = Paths.get(asyncProfilerHome + "/bin/asprof");
return Files.exists(defaultPath)? defaultPath.toString() : asyncProfilerHome + "/profiler.sh";
}

private Integer getInteger(final HttpServletRequest req, final String param, final Integer defaultValue) {
final String value = req.getParameter(param);
if (value != null) {
Expand Down Expand Up @@ -349,11 +361,11 @@ private Output getOutput(final HttpServletRequest req) {
try {
return Output.valueOf(outputArg.trim().toUpperCase());
} catch (IllegalArgumentException e) {
LOG.warn("Output format value is invalid, returning with default SVG");
return Output.SVG;
LOG.warn("Output format value is invalid, returning with default HTML");
return Output.HTML;
}
}
return Output.SVG;
return Output.HTML;
}

private void setResponseHeader(final HttpServletResponse response) {
Expand Down
15 changes: 15 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class LocalClient extends FrameworkClient {
private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
private volatile Throwable amFailException = null;
private boolean isLocalWithoutNetwork;
private String amHost;
private int amPort;

private static final String localModeDAGSchedulerClassName =
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled";
Expand Down Expand Up @@ -204,6 +206,9 @@ public ApplicationReport getApplicationReport(ApplicationId appId) {
report.setProgress(dagAppMaster.getProgress());
report.setAMRMToken(null);

this.amHost = dagAppMaster.getAppNMHost();
this.amPort = dagAppMaster.getRpcPort();

return report;
}

Expand Down Expand Up @@ -475,4 +480,14 @@ public boolean shutdownSession(Configuration configuration, ApplicationId sessio
}
return super.shutdownSession(configuration, sessionAppId, ugi);
}

@Override
public String getAmHost() {
return amHost;
}

@Override
public int getAmPort() {
return amPort;
}
}
4 changes: 2 additions & 2 deletions tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.counters.DAGCounter;
Expand Down Expand Up @@ -106,7 +106,7 @@ VertexStatusBuilder getVertexStatus(String vertexName,

void incrementDagCounter(DAGCounter counter, int incrValue);
void setDagCounter(DAGCounter counter, int setValue);
void addUsedContainer(ContainerId containerId);
void addUsedContainer(Container container);

/**
* Called by the DAGAppMaster when the DAG is started normally or in the event of recovery.
Expand Down
Loading

0 comments on commit 84bb02e

Please sign in to comment.