Skip to content

Commit

Permalink
terminate cluster taskflow actions (#369)
Browse files Browse the repository at this point in the history
* terminate cluster, grouped taskflow actions

* passing correct machine information, checking cluster states, cleanup

removed degbu 'componentWillUpdate'

fixed wrong action in terminateInstance

taskflow -> instance

fixes

* workflow and cluster log panels show state, added missing status icons

* better taskflow action management on cluster events

* adjusted taskflow action checking

* normalized start panels

* removed comment block
  • Loading branch information
TristanWright committed May 6, 2016
1 parent 9bb4c84 commit f5714a0
Show file tree
Hide file tree
Showing 20 changed files with 271 additions and 172 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"bootstrap": "3.3.6",
"react-router": "2.0.1",

"paraviewweb": "1.3.14",
"paraviewweb": "1.5.1",
"simput": "1.2.0",
"pvw-visualizer": "1.0.11",

Expand Down
13 changes: 11 additions & 2 deletions src/StateTransitionBehavior.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export function handleTaskflowChange(state, taskflow) {
const simulationStatus = [];

// Figure out possible actions and simulation state
if (jobs.every(job => job.status === 'terminated')) {
if (jobs.length && jobs.every(job => job.status === 'terminated')) {
simulationStatus.push('terminated');
actions.push('rerun');
} else if (tasks.some(task => task.status === 'error') && (jobs.length === 0 || !jobs.some(job => job.status === 'running'))) {
Expand Down Expand Up @@ -81,14 +81,23 @@ export function handleTaskflowChange(state, taskflow) {
// this is due to fewer jobs coming through SSE which triggers a fetch for trad clusters.
if (!taskflow.flow.meta) {
dispatch(TaskflowActions.fetchTaskflow(taskflow.flow._id));
} else {
const tfClusterId = taskflow.flow.meta.cluster._id,
tfCluster = state.preferences.clusters.mapById[tfClusterId];
if (tfCluster && tfCluster.type === 'ec2' &&
taskflow.flow.status !== 'running' &&
['running', 'error'].indexOf(tfCluster.status) !== -1) {
actions.push('terminateInstance');
}
}

// Update taslkfow meta
if (allComplete !== taskflow.allComplete ||
outputDirectory[0] !== taskflow.outputDirectory ||
!equals(actions, taskflow.actions) ||
primaryJob !== taskflow.primaryJob) {
dispatch(TaskflowActions.updateTaskflowMetadata(taskflow.flow._id, actions, allComplete, outputDirectory[0], primaryJob));
dispatch(TaskflowActions.updateTaskflowMetadata(taskflow.flow._id,
{ actions, allComplete, outputDirectory: outputDirectory[0], primaryJob }));

// Update simulation folders when all tasks/jobs are done
if (allComplete && taskflow.simulation && state.simulations.mapById[taskflow.simulation]) {
Expand Down
8 changes: 6 additions & 2 deletions src/network/remoteClient/Cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ export function getClusterPresets() {
return getJSON('/clusters-presets.json');
}

export function listClusterProfiles() {
return girder.listClusters({ type: 'trad' });
export function listClusterProfiles(type) {
return girder.listClusters({ type });
}

export function createCluster(cluster) {
Expand Down Expand Up @@ -35,3 +35,7 @@ export function getCluster(id) {
export function getClusterLog(id, offset = 0) {
return girder.getClusterLogs(id, offset);
}

export function terminateCluster(id) {
return girder.terminateCluster(id);
}
2 changes: 2 additions & 0 deletions src/network/remoteClient/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
listClusterProfiles,
saveCluster,
testCluster,
terminateCluster,
} from './Cluster';

import {
Expand Down Expand Up @@ -160,6 +161,7 @@ export default {
listClusterProfiles,
saveCluster,
testCluster,
terminateCluster,

/* File */
deleteFile,
Expand Down
1 change: 0 additions & 1 deletion src/pages/Preferences/AWS/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ const AWSPrefs = React.createClass({

saveItem() {
const { onUpdateItem, active, list } = this.props;
console.log(list[active]);
const contents = list[active];
if (contents._id) {
this.setState({ _error: 'Profile cannot be modified once saved' });
Expand Down
56 changes: 36 additions & 20 deletions src/panels/JobMonitor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ const JobMonitor = React.createClass({
clusterName: React.PropTypes.string,
tasks: React.PropTypes.array,
jobs: React.PropTypes.array,

taskStatusCount: React.PropTypes.object,
taskflowStatus: React.PropTypes.string,
taskflowLog: React.PropTypes.array,

clusterStatus: React.PropTypes.string,
clusterLog: React.PropTypes.array,
clusterLogStreamState: React.PropTypes.number,

Expand Down Expand Up @@ -51,7 +55,7 @@ const JobMonitor = React.createClass({

clusterLogOpen(open) {
if (open) {
console.log('stream state:', this.props.clusterLogStreamState);
// console.log('stream state:', this.props.clusterLogStreamState);
if (this.props.clusterLogStreamState === CLOSED) {
const offset = this.props.clusterLog.length;
this.props.subscribeToClusterLogStream(this.props.clusterId, offset);
Expand Down Expand Up @@ -110,31 +114,33 @@ const JobMonitor = React.createClass({
</div>
<div className={ style.taskflowContent }>
{
<ExecutionUnit unit={{ name: 'Log', log: this.props.taskflowLog }} />
<ExecutionUnit unit={{ name: 'Log', log: this.props.taskflowLog, status: this.props.taskflowStatus }} />
}
</div>
<div className={ style.toolbar }>
<div className={ style.title }>
Cluster log
{ this.props.clusterId ?
<div>
<div className={ style.toolbar }>
<div className={ style.title }>
Cluster log
</div>
<div className={ style.buttons }></div>
</div>
<div className={ style.buttons }></div>
</div>
<div className={ style.taskflowContent }>
{
<ExecutionUnit unit={{ name: 'Log', log: this.props.clusterLog }}
onToggle={this.clusterLogOpen} alwaysShowLogToggle
/>
}
</div>
<div className={ style.taskflowContent }>
{
<ExecutionUnit unit={{ name: 'Log', log: this.props.clusterLog, status: this.props.clusterStatus }}
onToggle={this.clusterLogOpen} alwaysShowLogToggle
/>
}
</div>
</div> : null
}
</div>
</div>);
},
});


// Binding --------------------------------------------------------------------
/* eslint-disable arrow-body-style */

export default connect(
(state, props) => {
const taskflowId = props.taskflowId;
Expand All @@ -144,11 +150,15 @@ export default connect(
const jobs = [];
const cluster = clusterId ? state.preferences.clusters.mapById[clusterId] : null;
const taskStatusCount = {};
var taskflowStatus = '';
var taskflowLog = [];
var clusterStatus = '';
var clusterLog = [];
var clusterLogStreamState = CLOSED;

// get tasks and jobs
if (taskflow && taskflow.taskMapById && taskflow.jobMapById) {
taskflowStatus = taskflow.flow.status;
Object.keys(taskflow.taskMapById).forEach(id => {
tasks.push(taskflow.taskMapById[id]);
const status = taskflow.taskMapById[id].status;
Expand All @@ -163,19 +173,26 @@ export default connect(
});
taskflowLog = taskflow.log;
}

// Sort the tasks by created timestamp
tasks.sort((task1, task2) => Date.parse(task1.created) > Date.parse(task2.created));

if (cluster && cluster.log && cluster.log.length) {
clusterLog = cluster.log.sort((task1, task2) => Date.parse(task1.created) > Date.parse(task2.created));
clusterLogStreamState = cluster.logStream ? cluster.logStream.readyState : CLOSED;
// get cluster status, logs, and stream state.
if (cluster) {
clusterStatus = cluster.status;
if (cluster.log && cluster.log.length) {
clusterLog = cluster.log.sort((task1, task2) => Date.parse(task1.created) > Date.parse(task2.created));
clusterLogStreamState = cluster.logStream ? cluster.logStream.readyState : CLOSED;
}
}

return {
tasks,
jobs,
taskStatusCount,
taskflowStatus,
taskflowLog,
clusterStatus,
clusterLog,
clusterLogStreamState,
};
Expand All @@ -186,4 +203,3 @@ export default connect(
unsubscribeFromClusterLogStream: (id) => dispatch(ClusterActions.unsubscribeClusterLogStream(id)),
})
)(JobMonitor);

23 changes: 11 additions & 12 deletions src/panels/run/RunEC2.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ export default React.createClass({
newState.machines = resp.data;
newState.machineFamilies = Object.keys(newState.machines[newState.profile.regionName]);
newState.machinesInFamily = newState.machines[newState.profile.regionName]['General purpose'];
if (this.props.onChange) {
this.props.onChange('machine', newState.machinesInFamily[0], 'EC2');
}
this.setState(newState);
})
.catch((err) => {
Expand All @@ -70,6 +73,8 @@ export default React.createClass({
this.setState({
machinesInFamily: this.state.machines[this.state.profile.regionName][value],
});
} else if (key === 'machine') {
value = this.state.machinesInFamily[value];
}

if (this.props.onChange) {
Expand All @@ -79,22 +84,15 @@ export default React.createClass({

render() {
var optionMapper = (el, index) =>
<option
key={ `${el.name}_${index}` }
value={index}
>{el.name}</option>;
<option key={ `${el.name}_${index}` } value={index}>
{el.name}
</option>;
var machineFamilyMapper = (family, index) =>
<option
key={family}
value={family}
>
<option key={family} value={family}>
{ family }
</option>;
var machineMapper = (machine, index) =>
<option
key={machine.id}
value={index}
>
<option key={machine.id} value={ index } >
{ `${machine.id} - ${machine.cpu} core${machine.cpu > 1 ? 's' : ''} - ` +
`${machine.memory} ${machine.gpu ? ' + GPU' : ''} - ${machine.storage}` +
` - $${Number(machine.price).toPrecision(3)} est. per hour per node` }
Expand All @@ -109,6 +107,7 @@ export default React.createClass({
</span>
</div>;
}

return (
<div className={style.container}>
<section className={style.group}>
Expand Down
Loading

0 comments on commit f5714a0

Please sign in to comment.