-
Notifications
You must be signed in to change notification settings - Fork 57
Running SciLuigi workflows with Slurm
In this example we show how to adapt the example workflow so that we can run it on a cluster using the Slurm Workload Manager (Slurm).
The workflow consists of two tasks, one which creates a file called foo.txt
and writes foo
in it, and one which reads foo.txt
and swaps every occurrence of foo
with the name of the cluster node on which the workflow is running.
To do this, we have to slightly change the workflow definition. In particular, we set up a runmode
parameter which will allow us to specify from the command line whether we want the workflow to be run locally or on the cluster.
When we define the tasks, we have to pass an additional SlurmInfo
object, which contains the specification of the resources we want to allocate to our tasks and other Slurm parameters.
class MyWorkflow(sciluigi.WorkflowTask):
# We define a runmode parameter to specify how to run the workflow
runmode = luigi.Parameter()
def workflow(self):
if self.runmode == 'local':
runmode = sciluigi.RUNMODE_LOCAL
elif self.runmode == 'hpc':
runmode = sciluigi.RUNMODE_HPC
elif self.runmode == 'mpi':
runmode = sciluigi.RUNMODE_MPI
else:
raise Exception('Runmode is none of local, hpc, nor mpi. Please fix and try again!')
# we construct our tasks as before, but we pass an additional
# SlurmInfo object
foowriter = self.new_task('foowriter', MyFooWriter,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='myname', # this should be the account (salloc -A, strange name choice)
partition='mypartition',
cores='1',
time='1:00:00',
jobname='foowriter',
threads='1'))
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
slurminfo=sciluigi.SlurmInfo(
runmode=runmode,
project='myname',
partition='mypartition',
cores='1',
time='1:00:00',
jobname='fooreplacer',
threads='1'))
# Here we do the *magic*: Connecting outputs to inputs:
fooreplacer.in_foo = foowriter.out_foo
# Return the last task(s) in the workflow chain.
return fooreplacer
The parameters we pass to SlurmInfo
will be used to construct a call to the Slurm command salloc
, which will request the resources on the cluster. On these lines, you can see how the SciLuigi parameters translate to salloc
parameters, and you can find a list of the salloc
parameters here or by just typing salloc --help
on your cluster.
It may be useful to point out that actually what in SciLuigi is called project_name
, corresponds to the -A
or --account
in Slurm, which is the account to which resources will be charged.
Now we can move on to setting up our tasks. MyFooWriter
will be unchanged, except for the fact that it subclass sciluigi.SlurmTask
instead of sciluigi.Task
:
class MyFooWriter(sciluigi.SlurmTask):
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo.txt')
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
Then, we will create a bash script called replace_with_hostname.sh
, in the same directory as our workflow, which is going to replace the foo
string with the hostname of the machine on which the job is running. To do that, it should contain the following lines
#! /bin/bash
sed "s/foo/$(hostname)/" $1 > $2
This script will use the sed
command to replace occurrences of foo
in a file specified by the first command line argument with the host name, and write the modified file to the second command line argument.
We can now write the MyFooReplacer
task, which will call the bash script we just created.
class MyFooReplacer(sciluigi.SlurmTask):
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
# As the path to the returned target(info), we
# use the directory of the foo file, and save the modified
# version to a file called bar.txt
out_file = os.path.join(os.path.dirname(self.in_foo().path), 'bar.txt')
return sciluigi.TargetInfo(self, out_file)
def run(self):
# Here, we use the in-built self.ex() method, to execute our script, and we pass two command line arguments to it
self.ex('./replace_with_hostname.sh {in_file} {out_file}'.format(
in_file = self.in_foo().path,
out_file = self.out_replaced().path))
At the end of the script, we need to add
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyWorkflow)
Finally, and supposing that we saved this code to a script called sciluigi_slurm_example.py
, we can run locally, with
python sciluigi_slurm_example.py --runmode local
Or on the cluster, with
python sciluigi_slurm_example.py --runmode hpc