From 1084699ddb1baf6e4a6df97d988ac696d33f41e8 Mon Sep 17 00:00:00 2001 From: Shohei Okumiya Date: Sun, 22 Dec 2024 20:10:24 +0900 Subject: [PATCH] TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts (#324) (Shohei Okumiya reviewed by Laszlo Bodor) --- .../apache/tez/dag/api/TezConfiguration.java | 21 ++++++++- .../apache/tez/runtime/hook/TezDAGHook.java | 44 ++++++++++++++++++ .../tez/runtime/hook/TezTaskAttemptHook.java | 44 ++++++++++++++++++ .../apache/tez/runtime/hook/package-info.java | 22 +++++++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 23 +++++++--- .../apache/tez/dag/app/ThreadDumpDAGHook.java | 41 +++++++++++++++++ .../tez/runtime/TezThreadDumpHelper.java | 45 ++++++------------- .../org/apache/tez/runtime/task/TezChild.java | 16 +++++-- .../task/ThreadDumpTaskAttemptHook.java | 41 +++++++++++++++++ .../java/org/apache/tez/test/TestTezJobs.java | 6 +++ 10 files changed, 260 insertions(+), 43 deletions(-) create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java create mode 100644 tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java create mode 100644 tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java create mode 100644 tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 7e86853631..8862f4b7d6 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -2297,12 +2297,14 @@ static Set getPropertySet() { public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties"; /** - * Frequency at which thread dump should be captured. Supports TimeUnits. + * Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only + * when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or + * org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks. */ @ConfigurationScope(Scope.DAG) @ConfigurationProperty public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval"; - public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms"; + public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms"; /** * Limits the amount of data that can be written to LocalFileSystem by a Task. @@ -2312,4 +2314,19 @@ static Set getPropertySet() { public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes"; public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1; + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook. + * e.g. org.apache.tez.dag.app.ThreadDumpDAGHook + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks"; + + /** + * Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook. + * e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook + */ + @ConfigurationScope(Scope.DAG) + @ConfigurationProperty + public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks"; } diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java new file mode 100644 index 0000000000..7fb015bdb1 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezDAGHook.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; + +/** + * A hook which is instantiated and triggered before and after a DAG is executed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezDAGHook { + /** + * Invoked before the DAG starts. + * + * @param id the DAG id + * @param conf the conf + */ + void start(TezDAGID id, Configuration conf); + + /** + * Invoked after the DAG finishes. + */ + void stop(); +} diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java new file mode 100644 index 0000000000..54931b64d5 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/TezTaskAttemptHook.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; + +/** + * A hook which is instantiated and triggered before and after a task attempt is executed. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface TezTaskAttemptHook { + /** + * Invoked before the task attempt starts. + * + * @param id the task attempt id + * @param conf the conf + */ + void start(TezTaskAttemptID id, Configuration conf); + + /** + * Invoked after the task attempt finishes. + */ + void stop(); +} diff --git a/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java new file mode 100644 index 0000000000..d977897d86 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/runtime/hook/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@Private +package org.apache.tez.runtime.hook; + +import org.apache.hadoop.classification.InterfaceAudience.Private; \ No newline at end of file diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 9c7cc18b60..4172a5a368 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -71,6 +71,7 @@ import org.apache.tez.Utils; import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; @@ -187,7 +188,7 @@ import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; @@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService { Map services = new LinkedHashMap(); private ThreadLocalMap mdcContext; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; + private TezDAGHook[] hooks = {}; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -770,7 +771,9 @@ protected synchronized void handle(DAGAppMasterEvent event) { "DAGAppMaster Internal Error occurred"); break; case DAG_FINISHED: - tezThreadDumpHelper.stop(); + for (TezDAGHook hook : hooks) { + hook.stop(); + } DAGAppMasterEventDAGFinished finishEvt = (DAGAppMasterEventDAGFinished) event; String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); @@ -2226,8 +2229,10 @@ public Void run() throws Exception { execService.shutdownNow(); } - // Check if the thread dump service is up in any case, if yes attempt a shutdown - tezThreadDumpHelper.stop(); + // Try to shut down any hooks that are still active + for (TezDAGHook hook : hooks) { + hook.stop(); + } super.serviceStop(); } @@ -2599,7 +2604,13 @@ private void countHeldContainers(DAG newDAG) { private void startDAGExecution(DAG dag, final Map additionalAmResources) throws TezException { currentDAG = dag; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString()); + final Configuration conf = dag.getConf(); + final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]); + hooks = new TezDAGHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(dag.getID(), conf); + } // Try localizing the actual resources. List additionalUrlsForClasspath; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java new file mode 100644 index 0000000000..ff657e47f1 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ThreadDumpDAGHook.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezDAGHook; + +/** + * A DAG hook which dumps thread information periodically. + */ +public class ThreadDumpDAGHook implements TezDAGHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezDAGID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java index 6f3e9fec1e..022186a4b8 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/TezThreadDumpHelper.java @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Appender; +import org.apache.tez.common.Preconditions; import org.apache.tez.common.TezContainerLogAppender; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezUncheckedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +47,9 @@ public class TezThreadDumpHelper { - public static final NoopTezThreadDumpHelper NOOP_TEZ_THREAD_DUMP_HELPER = new NoopTezThreadDumpHelper(); - private long duration = 0L; - private Path basePath = null; - private FileSystem fs = null; + private final long duration; + private final Path basePath; + private final FileSystem fs; private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean(); private static final Logger LOG = LoggerFactory.getLogger(TezThreadDumpHelper.class); @@ -70,21 +71,17 @@ private TezThreadDumpHelper(long duration, Configuration conf) throws IOExceptio "path: {}", duration, basePath); } - public TezThreadDumpHelper() { - } - public static TezThreadDumpHelper getInstance(Configuration conf) { - long periodicThreadDumpFrequency = - conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); - - if (periodicThreadDumpFrequency > 0) { - try { - return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); - } catch (IOException e) { - LOG.warn("Can not initialize periodic thread dump service", e); - } + long periodicThreadDumpFrequency = conf.getTimeDuration(TEZ_THREAD_DUMP_INTERVAL, + TEZ_THREAD_DUMP_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + Preconditions.checkArgument(periodicThreadDumpFrequency > 0, "%s must be positive duration", + TEZ_THREAD_DUMP_INTERVAL); + + try { + return new TezThreadDumpHelper(periodicThreadDumpFrequency, conf); + } catch (IOException e) { + throw new TezUncheckedException("Can not initialize periodic thread dump service", e); } - return NOOP_TEZ_THREAD_DUMP_HELPER; } public TezThreadDumpHelper start(String name) { @@ -178,18 +175,4 @@ private String getTaskName(long id, String taskName) { return id + " (" + taskName + ")"; } } - - private static class NoopTezThreadDumpHelper extends TezThreadDumpHelper { - - @Override - public TezThreadDumpHelper start(String name) { - // Do Nothing - return this; - } - - @Override - public void stop() { - // Do Nothing - } - } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 98b07100a8..ed14bd880c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -52,6 +52,7 @@ import org.apache.log4j.helpers.ThreadLocalMap; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezLocalResource; @@ -69,10 +70,10 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; -import org.apache.tez.runtime.TezThreadDumpHelper; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.util.LoggingUtils; @@ -120,7 +121,6 @@ public class TezChild { private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final String user; private final boolean updateSysCounters; - private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER; private Multimap startedInputsMap = HashMultimap.create(); private final boolean ownUmbilical; @@ -295,7 +295,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, hadoopShim, sharedExecutor); boolean shouldDie; - tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString()); + final String[] hookClasses = taskConf + .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]); + final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length]; + for (int i = 0; i < hooks.length; i++) { + hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]); + hooks[i].start(attemptId, taskConf); + } try { TaskRunner2Result result = taskRunner.run(); LOG.info("TaskRunner2Result: {}", result); @@ -314,7 +320,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, e, "TaskExecutionFailure: " + e.getMessage()); } } finally { - tezThreadDumpHelper.stop(); + for (TezTaskAttemptHook hook : hooks) { + hook.stop(); + } FileSystem.closeAllForUGI(childUGI); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java new file mode 100644 index 0000000000..dd41cee9d2 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ThreadDumpTaskAttemptHook.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.task; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.TezThreadDumpHelper; +import org.apache.tez.runtime.hook.TezTaskAttemptHook; + +/** + * A task attempt hook which dumps thread information periodically. + */ +public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook { + private TezThreadDumpHelper helper; + + @Override + public void start(TezTaskAttemptID id, Configuration conf) { + helper = TezThreadDumpHelper.getInstance(conf).start(id.toString()); + } + + @Override + public void stop() { + helper.stop(); + } +} diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 892629f29e..ee717f33c0 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -18,7 +18,9 @@ package org.apache.tez.test; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_HOOKS; import static org.apache.tez.dag.api.TezConfiguration.TEZ_THREAD_DUMP_INTERVAL; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS; import static org.apache.tez.dag.api.TezConstants.TEZ_CONTAINER_LOGGER_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -59,11 +61,13 @@ import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.app.ThreadDumpDAGHook; import org.apache.tez.mapreduce.examples.CartesianProduct; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -555,6 +559,8 @@ public void testSortMergeJoinExampleDisableSplitGrouping(boolean withThreadDump) org.apache.log4j.Logger.getRootLogger().addAppender(appender); appender.setName(TEZ_CONTAINER_LOGGER_NAME); appender.setContainerLogDir(logPath.toString()); + newConf.set(TEZ_AM_HOOKS, ThreadDumpDAGHook.class.getName()); + newConf.set(TEZ_TASK_ATTEMPT_HOOKS, ThreadDumpTaskAttemptHook.class.getName()); newConf.set(TEZ_THREAD_DUMP_INTERVAL, "1ms"); } sortMergeJoinExample.setConf(newConf);