Skip to content

Commit

Permalink
TEZ-4352: Add a web ui interface for TezChild
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Nov 11, 2022
1 parent 8ebc4b0 commit 538d2e1
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 2 deletions.
21 changes: 21 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,27 @@ public TezConfiguration(boolean loadDefaults) {
public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS = TEZ_MDC_CUSTOM_KEYS + ".conf.props";
public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT = "";

/**
* String value
* Whether to start web ui service in task processes.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="boolean")
public static final String TEZ_TASK_WEBSERVICE_ENABLE = TEZ_TASK_PREFIX
+ "webservice.enable";
public static final boolean TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT = false;

/**
* String value. Range of ports that the task container can use for the WebUIService. Leave blank
* to use all possible ports. Expert level setting. It's hadoop standard range configuration.
* For example 50051-55000
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type = "boolean")
public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE = TEZ_AM_PREFIX + "webservice.port-range";

public static final String TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT = "50051-55000";

/**
* double value. Represents ratio of unique failed outputs / number of consumer
* tasks. When this condition or value mentioned in {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ public interface ExecutionContext {
* Get the hostname on which the JVM is running.
* @return the hostname
*/
public String getHostName();
String getHostName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class ExecutionContextImpl implements ExecutionContext {

private final String hostname;
private String containerId = null;

public ExecutionContextImpl(String hostname) {
this.hostname = hostname;
Expand All @@ -28,4 +29,13 @@ public ExecutionContextImpl(String hostname) {
public String getHostName() {
return hostname;
}

public ExecutionContext containerId(String containerId) {
this.containerId = containerId;
return this;
}

public String getContainerId() {
return containerId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.web.TezChildWebUIService;
import org.apache.tez.util.LoggingUtils;

import org.apache.tez.util.TezRuntimeShutdownHandler;
Expand Down Expand Up @@ -131,6 +132,8 @@ public class TezChild {
private final TezExecutors sharedExecutor;
private ThreadLocalMap mdcContext;

private TezChildWebUIService webUIService;

public TezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs,
Map<String, String> serviceProviderEnvMap,
Expand Down Expand Up @@ -207,6 +210,9 @@ public TezTaskUmbilicalProtocol run() throws Exception {
ownUmbilical = false;
}
TezCommonUtils.logCredentials(LOG, credentials, "tezChildInit");
if (isWebUIServiceEnabled(conf)) {
this.webUIService = new TezChildWebUIService(conf, executionContext).start();
}
}

public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {
Expand Down Expand Up @@ -424,11 +430,19 @@ public void shutdown() {
if (ownUmbilical) {
RPC.stopProxy(umbilical);
}
if (webUIService != null) {
webUIService.stop();
}
}
TezRuntimeShutdownHandler.shutdown();
LOG.info("TezChild shutdown finished");
}

private boolean isWebUIServiceEnabled(Configuration conf) {
return conf.getBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE,
TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE_DEFAULT);
}

public static class ContainerExecutionResult {
public static enum ExitStatus {
SUCCESS(0),
Expand Down Expand Up @@ -545,7 +559,9 @@ public static void main(String[] args) throws IOException, InterruptedException,

TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
System.getenv(), pid,
new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name()))
.containerId(System.getenv(Environment.CONTAINER_ID.name())),
credentials, Runtime.getRuntime().maxMemory(), System
.getenv(ApplicationConstants.Environment.USER.toString()), null, true, hadoopShim);
ContainerExecutionResult result = tezChild.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.web;

import java.io.PrintWriter;

import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.MimeType;
import org.apache.hadoop.yarn.webapp.View;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;

import com.google.inject.Inject;

public class TezChildWebController extends Controller {

@Inject
public TezChildWebController(RequestContext requestContext) {
super(requestContext);
}

@Override
public void index() {
ui();
}

public void ui() {
render(StaticTezChildView.class);
}

public static class StaticTezChildView extends View {
@Inject
ExecutionContextImpl executionContext;

@Override
public void render() {
response().setContentType(MimeType.HTML);
PrintWriter pw = writer();
pw.write("<html><head><meta charset=\\\"utf-8\\\"><title>TezChild UI</title>");
pw.write("</head><body>");
pw.write(String.format("<h1>TezChild UI</h1> <h2>%s, %s</h2> %s :: %s :: %s", executionContext.getHostName(),
executionContext.getContainerId(), getLink("jmx"), getLink("conf"), getLink("stacks")));
pw.write("</body></html>");
pw.flush();
}

private String getLink(String path) {
return "<a href=\"/" + path + "\">" + path + "</a>";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.web;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.apache.tez.common.web.ServletToControllerAdapters.ConfServletController;
import org.apache.tez.common.web.ServletToControllerAdapters.JMXJsonServletController;
import org.apache.tez.common.web.ServletToControllerAdapters.StackServletController;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TezChildWebUIService {
private static final Logger LOG = LoggerFactory.getLogger(TezChildWebUIService.class);

private Configuration conf;
private ExecutionContext executionContext;
private TezChildWebApp tezChildWebApp;
private WebApp webApp;
private String baseUrl = ""; //url without paths, like http://host:port

public TezChildWebUIService(Configuration conf, ExecutionContext executionContext) {
this.tezChildWebApp = new TezChildWebApp(executionContext);
this.conf = conf;
this.executionContext = executionContext;
}

public TezChildWebUIService start() {
try {
if (conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE) == null) {
conf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE,
TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE_DEFAULT);
LOG.info(
"Using default port range for WebUIService: " + conf.get(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE));
}
this.webApp = WebApps.$for(this.tezChildWebApp).with(conf)
.withPortRange(conf, TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE).start(this.tezChildWebApp);
InetSocketAddress address = webApp.getListenerAddress();
if (address != null) {
String hostname = executionContext.getHostName();
InetSocketAddress bindAddress = NetUtils.createSocketAddrForHost(hostname, address.getPort());
final int port = address.getPort();
if (bindAddress.getAddress() != null && bindAddress.getAddress().getCanonicalHostName() != null) {
hostname = bindAddress.getAddress().getCanonicalHostName();
} else {
LOG.warn("Failed to resolve canonical hostname for " + hostname);
}
baseUrl = String.format("http://%s:%d", hostname, port);
LOG.info("Instantiated TezChild WebUIService at " + baseUrl + "/ui");
}
} catch (Exception e) {
LOG.error("TezChild WebUIService failed to start.", e);
throw new TezUncheckedException(e);
}
return this;
}

public void stop() {
if (this.webApp != null) {
LOG.debug("Stopping WebApp");
this.webApp.stop();
}
}

private static class TezChildWebApp extends WebApp {
private ExecutionContext executionContext;

public TezChildWebApp(ExecutionContext executionContext) {
this.executionContext = executionContext;
}

@Override
public void setup() {
bind(ExecutionContextImpl.class).toInstance((ExecutionContextImpl) executionContext);
route("/", TezChildWebController.class, "ui");
route("/ui", TezChildWebController.class, "ui");
route("/jmx", JMXJsonServletController.class);
route("/conf", ConfServletController.class);
route("/stacks", StackServletController.class);
}
}
}
42 changes: 42 additions & 0 deletions tez-tests/src/test/java/org/apache/tez/test/TestAM.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,48 @@ public void testAMWebUIService() throws TezException, IOException, InterruptedEx
tezSession.stop();
}

@Test(timeout = 60000)
public void testTaskWebUIService() throws TezException, IOException, InterruptedException {
SleepProcessorConfig spConf = new SleepProcessorConfig(1);

DAG dag = DAG.create("TezSleepProcessor");
Vertex vertex = Vertex.create("SleepVertex",
ProcessorDescriptor.create(SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
Resource.newInstance(1024, 1));
dag.addVertex(vertex);

TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig());
tezConf.setBoolean(TezConfiguration.TEZ_TASK_WEBSERVICE_ENABLE, true);
String tezTaskWebUIServicePort = "50051";
tezConf.set(TezConfiguration.TEZ_TASK_WEBSERVICE_PORT_RANGE, tezTaskWebUIServicePort);

TezClient tezSession = TezClient.create("TezSleepProcessor", tezConf, false);
tezSession.start();

DAGClient dagClient = tezSession.submitDAG(dag);

DAGStatus dagStatus = dagClient.getDAGStatus(null);
while (!dagStatus.isCompleted()) {
Thread.sleep(500L);
dagStatus = dagClient.getDAGStatus(null);
}

// host: this is a unit test, we can assume that task container runs on the same host as the am
// port: we expect it to be what we configured
String amWebUIAddress = dagClient.getWebUIAddress();
URL amWebUIAddressUrl = new URL(amWebUIAddress);
URL taskWebUIAddress = new URL(amWebUIAddressUrl.getProtocol(), amWebUIAddressUrl.getHost(),
Integer.parseInt(tezTaskWebUIServicePort), "");

LOG.info("TezTask webUI address: " + taskWebUIAddress);

checkAddress(taskWebUIAddress + "/jmx");
checkAddress(taskWebUIAddress + "/conf");
checkAddress(taskWebUIAddress + "/stacks");

tezSession.stop();
}

private void checkAddress(String url) {
checkAddress(url, 200);
}
Expand Down

0 comments on commit 538d2e1

Please sign in to comment.