Skip to content

Commit

Permalink
Portal updates (#399)
Browse files Browse the repository at this point in the history
* *Updates the .jhist File Path for Killed Applications*

When an application is killed by the user, there is no opportunity for a graceful shutdown and those files are never renamed from .jhist.inprogress to the final application status such as  -FAILED.jhist or -SUCCEEDED.jhist causing those applications to be interpreted by TonY as currently RUNNING on TonY portal even though they have been terminated.  This fix renames those files to -KILLED.jhist, correctly identifying the final status of those killed applications.

* *Updates TonY Portal page view*

This fix adds a parameter to the data table to maintain the default ordering of the TonY jobs and increases the default number of records from 10 to 25.

* *Added PR fixes*

* *Refactored code for tests*

Tests not passing due to issues with yarn environment.
Refactored code to eliminate creating separate yarn client
object in test class.

* *Fixed code in HistoryFileMoverTest.java*

* *Adds Yarn Env to HistoryFileMoverTest.java*

* **Adds Mocks to HistoryFileMoverTest**

This update adds a yarnclient mock to the HistoryFileMoverTest
in order to bypass the yarn requirements of the main
HistoryFileMover class. This update also adjusts the yarn client
initializetion of the main class to once when the mover starts
and not each time the killedAppIds function is called.
  • Loading branch information
i-ony authored and oliverhu committed Oct 24, 2019
1 parent 8e2e90b commit bf51a85
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 12 deletions.
8 changes: 8 additions & 0 deletions tony-portal/app/hadoop/Requirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.api.YarnClient;
import play.Logger;
import utils.ConfigUtils;

Expand Down Expand Up @@ -111,4 +112,11 @@ public Path getFinishedDir() {
public Path getIntermediateDir() {
return interm;
}

public YarnClient getYarnClient() {
return YarnClient.createYarnClient();
}

}


87 changes: 78 additions & 9 deletions tony-portal/app/history/HistoryFileMover.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@
import com.linkedin.tony.util.Utils;
import com.typesafe.config.Config;
import hadoop.Requirements;
import java.io.File;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -18,6 +22,11 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import play.Logger;
import utils.ConfigUtils;

Expand All @@ -30,13 +39,16 @@ public class HistoryFileMover {
private final Path intermediateDir;
private final Path finishedDir;
private final CacheWrapper cacheWrapper;
private YarnClient yarnClient;

@Inject
public HistoryFileMover(Config appConf, Requirements requirements, CacheWrapper cacheWrapper) {
public HistoryFileMover(Config appConf, Requirements requirements, CacheWrapper cacheWrapper)
throws IOException, YarnException{
fs = requirements.getFileSystem();
intermediateDir = requirements.getIntermediateDir();
finishedDir = requirements.getFinishedDir();
this.cacheWrapper = cacheWrapper;
yarnClient = requirements.getYarnClient();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
long moverIntervalMs = ConfigUtils.fetchIntConfigIfExists(appConf,
Expand All @@ -46,35 +58,45 @@ public HistoryFileMover(Config appConf, Requirements requirements, CacheWrapper
TonyConfigurationKeys.TONY_HISTORY_FINISHED_DIR_TIMEZONE,
TonyConfigurationKeys.DEFAULT_TONY_HISTORY_FINISHED_DIR_TIMEZONE);

YarnConfiguration yarnConf = new YarnConfiguration();
if (System.getenv(Constants.HADOOP_CONF_DIR) != null) {
yarnConf.addResource(new Path(System.getenv(Constants.HADOOP_CONF_DIR) +
File.separatorChar + Constants.CORE_SITE_CONF));
yarnConf.addResource(new Path(System.getenv(Constants.HADOOP_CONF_DIR) +
File.separatorChar + Constants.YARN_SITE_CONF));
}
yarnClient.init(yarnConf);
yarnClient.start();

// Throws DateTimeException or ZoneRulesException given wrong TimeZone format.
ZoneId zoneId = ZoneId.of(finishedDirTimeZone);

LOG.info("Starting background history file mover thread, will run every " + moverIntervalMs + " milliseconds.");
scheduledThreadPool.scheduleAtFixedRate(() -> {
FileStatus[] jobDirs = null;
FileStatus[] intermedDir = null;
try {
jobDirs = fs.listStatus(intermediateDir);
intermedDir = fs.listStatus(intermediateDir);
LOG.info("Intermediate Directory Size: " + intermedDir.length);
} catch (IOException e) {
LOG.error("Failed to list files in " + intermediateDir, e);
}
if (jobDirs != null) {
if (intermedDir != null) {
try {
moveIntermediateToFinished(fs, jobDirs, zoneId);
renameKilledApps(intermedDir);
moveIntermediateToFinished(intermedDir, zoneId);
} catch (Exception e) {
LOG.error("Encountered exception while moving history directories", e);
}
}
}, 0, moverIntervalMs, TimeUnit.MILLISECONDS);
}

private void moveIntermediateToFinished(FileSystem fs, FileStatus[] jobDirs, ZoneId zoneId) {
private void moveIntermediateToFinished(FileStatus[] jobDirs, ZoneId zoneId) {
for (FileStatus jobDir : jobDirs) {
cacheWrapper.updateCaches(jobDir.getPath());
String jhistFilePath = ParserUtils.getJhistFilePath(fs, jobDir.getPath());
if (jhistFilePath == null || jobInProgress(jhistFilePath)) {
continue;
}

Path source = new Path(jhistFilePath).getParent();
StringBuilder destString = new StringBuilder(finishedDir.toString());
Date endDate = new Date(ParserUtils.getCompletedTimeFromJhistFileName(jhistFilePath));
Expand All @@ -85,7 +107,6 @@ private void moveIntermediateToFinished(FileSystem fs, FileStatus[] jobDirs, Zon
destString.append(Path.SEPARATOR).append(source.getName());
}
Utils.createDirIfNotExists(fs, new Path(destString.toString()), Constants.PERM770);

Path dest = new Path(destString.toString());
LOG.info("Moving " + source + " to " + dest);
try {
Expand All @@ -99,4 +120,52 @@ private void moveIntermediateToFinished(FileSystem fs, FileStatus[] jobDirs, Zon
private boolean jobInProgress(String jhistFileName) {
return !jhistFileName.endsWith(Constants.HISTFILE_SUFFIX);
}

private List<String> killedAppIds() throws IOException, YarnException {
List<String> killedIDs = new ArrayList<String>();
List<ApplicationReport> killedAppReports = yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED));

for (ApplicationReport k : killedAppReports) {
killedIDs.add(k.getApplicationId().toString());
}
LOG.info("Number Killed Apps: " + killedIDs.size());
return killedIDs;
}

private void renameKilledApps(FileStatus[] intermediateDir) throws IOException, YarnException {
List<FileStatus> killedAppDirectories = new ArrayList<>();
List<String> killedAppIds = killedAppIds();

for (String killedAppId : killedAppIds) {
for (FileStatus jobDirFilePath : intermediateDir) {
if (jobDirFilePath.toString().contains(killedAppId)) {
killedAppDirectories.add(jobDirFilePath);
}
}
}

for (FileStatus killedAppDirectory : killedAppDirectories) {
String jhistFilePath = ParserUtils.getJhistFilePath(fs, killedAppDirectory.getPath());
if (jhistFilePath.endsWith(".jhist.inprogress")) {

//new file name will need an end time, set it to current time
long currentTimestamp = System.currentTimeMillis();
Path sourcePath = new Path(jhistFilePath);
String jhistFileName = jhistFilePath.substring(jhistFilePath.lastIndexOf('/') + 1);

//Section of filename that will be replaced --> username.jhist.inprogress
String oldJhistSubstring = jhistFileName.substring(jhistFileName.lastIndexOf('-') + 1);
String username = oldJhistSubstring.split("\\.")[0];
String newJhistSubstring = currentTimestamp + "-" + username + "-KILLED.jhist";
String killedJhistFilePath = jhistFilePath.replace(oldJhistSubstring, newJhistSubstring);
Path killedPath = new Path(killedJhistFilePath);
try {
fs.rename(sourcePath, killedPath);
} catch (IOException e) {
LOG.error("Failed to rename KILLED apps", e);
}
}
}
}
}

4 changes: 3 additions & 1 deletion tony-portal/app/views/tableMetadata.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@

var table = $('#table_id').DataTable({
paging: true,
fixedColumns: true
fixedColumns: true,
aaSorting: [],
"pageLength": 25
});

// Filter event handler
Expand Down
18 changes: 16 additions & 2 deletions tony-portal/test/history/HistoryFileMoverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@
import java.io.File;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;


@RunWith(MockitoJUnitRunner.class)
Expand All @@ -35,6 +41,10 @@ public class HistoryFileMoverTest {
@Mock
CacheWrapper cacheWrapper;

@Mock
YarnClient yarnClient;


private static File tempDir;
private static File intermediateDir;
private static File finishedDir;
Expand All @@ -49,7 +59,7 @@ public static void setup() {
}

@Test
public void testMoveIntermediateToFinished() throws IOException, InterruptedException {
public void testMoveIntermediateToFinished() throws IOException, InterruptedException, YarnException {
// Add a completed application in the intermediate dir
String appId = "application_123_456";
File appDir = new File(intermediateDir, appId);
Expand All @@ -69,6 +79,10 @@ public void testMoveIntermediateToFinished() throws IOException, InterruptedExce
when(reqs.getFileSystem()).thenReturn(FileSystem.getLocal(new Configuration()));
when(reqs.getIntermediateDir()).thenReturn(new Path(intermediateDir.getAbsolutePath()));
when(reqs.getFinishedDir()).thenReturn(new Path(finishedDir.getAbsolutePath()));
when(reqs.getYarnClient()).thenReturn(yarnClient);

doNothing().when(yarnClient).start();
when(yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED))).thenReturn(new ArrayList<>());

// start mover
new HistoryFileMover(config, reqs, cacheWrapper);
Expand Down

0 comments on commit bf51a85

Please sign in to comment.