-
Notifications
You must be signed in to change notification settings - Fork 57
Using
Creating workflows in SciLuigi differs slightly from how it is done in vanilla Luigi. Very briefly, it is done in these main steps:
- Create a workflow tasks class
- Create task classes
- Add the workflow definition in the workflow class's
workflow()
method. - Add a run method at the end of the script
- Run the script
The first thing to do when creating a workflow, is to define a workflow task.
You do this by:
- Creating a subclass of
sciluigi.WorkflowTask
- Implementing the
workflow()
method.
import sciluigi
class MyWorkflow(sciluigi.WorkflowTask):
def workflow(self):
pass # TODO: Implement workflow here later!
Then, you need to define some tasks that can be done in this workflow.
This is done by:
- Creating a subclass of
sciluigi.Task
(orsciluigi.SlurmTask
if you want Slurm support) - Adding fields named
in_<yournamehere>
for each input, in the new task class - Define methods named
out_<yournamehere>()
for each output, that returnsciluigi.TargetInfo
objects. (sciluigi.TargetInfo is initialized with a reference to the task object itself - typicallyself
- and a path name, where upstream tasks paths can be used). - Define luigi parameters to the task.
- Implement the
run()
method of the task.
Let's define a simple task that just writes "foo" to a file named foo.txt
:
class MyFooWriter(sciluigi.Task):
# We have no inputs here
# Define outputs:
def out_foo(self):
return sciluigi.TargetInfo(self, 'foo.txt')
def run(self):
with self.out_foo().open('w') as foofile:
foofile.write('foo\n')
Then, let's create a task that replaces "foo" with "bar":
import os
class MyFooReplacer(sciluigi.Task):
# Here, we take as a parameter what to replace foo with.
replacement = sciluigi.Parameter()
# Here we have one input, a "foo file":
in_foo = None
# ... and an output, a "bar file":
def out_replaced(self):
# As the path to the returned target(info), we
# use the directory of the foo file, and save the modified
# version to a file called bar.txt
out_file = os.path.join(os.path.dirname(self.in_foo().path), 'bar.txt')
return sciluigi.TargetInfo(self, out_file)
def run(self):
with self.in_foo().open() as in_f:
with self.out_replaced().open('w') as out_f:
# Here we see that we use the parameter self.replacement:
out_f.write(in_f.read().replace('foo', self.replacement))
The last lines, we could have instead written using the command-line sed
utility, available in linux, by calling it on the commandline, with the built-in ex()
method:
def run(self):
# Here, we use the in-built self.ex() method, to execute commands:
self.ex("sed 's/foo/{repl}/' {in_file} > {out_file}".format(
repl=self.replacement,
in_file=self.in_foo().path,
out_file=self.out_replaced().path))
Now, we can use these two tasks we created, to create a simple workflow, in our workflow class, that we also created above.
We do this by:
- Instantiating the tasks, using the
self.new_task(<unique_taskname>, <task_class>, *args, **kwargs)
method, of the workflow task. - Connect the tasks together, by pointing the right
out_*
method to the rightin_*
field. - Returning the last task in the chain, from the workflow method.
import sciluigi
class MyWorkflow(sciluigi.WorkflowTask):
def workflow(self):
foowriter = self.new_task('foowriter', MyFooWriter)
fooreplacer = self.new_task('fooreplacer', MyFooReplacer,
replacement='bar')
# Here we do the *magic*: Connecting outputs to inputs:
fooreplacer.in_foo = foowriter.out_foo
# Return the last task(s) in the workflow chain.
return fooreplacer
Now, the only thing that remains, is adding a run method to the end of the script.
You can use luigi's own luigi.run()
, or our own two methods:
sciluigi.run()
sciluigi.run_local()
The run_local()
one, is handy if you don't want to run a central scheduler daemon, but just want to run the workflow as a script.
Both of the above take the same options as luigi.run()
, so you can for example set the main class to use (our workflow task):
# End of script ....
if __name__ == '__main__':
sciluigi.run_local(main_task_cls=MyWorkflow)
Now, you should be able to run the workflow as simple as:
python myworkflow.py
... provided of course, that the workflow is saved in a file named myworkflow.py.
See the examples folder for more detailed examples!