-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfloat_submit.py
executable file
·148 lines (116 loc) · 4.61 KB
/
float_submit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
import json
import re
import shlex
import sys
import subprocess
from snakemake.utils import read_job_properties
from float_config import FloatConfig
from float_login import FloatLogin
from float_utils import logger
class FloatSubmit:
_AWS_CPU_UPPER_BOUND = 192
_AWS_MEM_UPPER_BOUND = 1024
def __init__(self):
self._cmd = ['float', 'submit', '--force', '--format', 'json']
self._config = FloatConfig()
def submit_job(self, job_file):
cmd = self._cmd
cfg = self._config
config_parameters = cfg.parameters()
cmd.extend(['--image', config_parameters['base-image']])
for data_volume in config_parameters['data-volumes']:
cmd.extend(['--dataVolume', data_volume])
job_properties = read_job_properties(jobscript)
if 'cpu' in config_parameters:
cmd.extend(['--cpu', config_parameters['cpu']])
else:
cpu = max(job_properties.get('threads'), 2)
cmd.extend(['--cpu', f"{cpu}:{self._AWS_CPU_UPPER_BOUND}"])
if 'mem' in config_parameters:
cmd.extend(['--mem', config_parameters['mem']])
else:
mem_MiB = max(
job_properties.get('resources', {}).get('mem_mib'),
4096
)
mem_GiB = (mem_MiB + 1023) // 1024
cmd.extend(['--mem', f"{mem_GiB}:{self._AWS_MEM_UPPER_BOUND}"])
try:
with open(job_file, 'r') as jf:
script_lines = jf.readlines()
logger.debug('Opened job file for reading')
except OSError:
logger.exception('Cannot open job file for reading')
raise
exec_job_cmd = script_lines[-1]
attempt = exec_job_cmd.partition(' --attempt ')[2].partition(' ')[0]
snakejob = job_properties.get('jobid', 'N/A')
job_prefix = config_parameters.get('job-prefix', 'snakemake')
job_name = f"{job_prefix}-job_{snakejob}-attempt_{attempt}"
cmd.extend(['--name', job_name])
cmd.extend(['--job', job_file])
cmd.extend(shlex.split(config_parameters.get('submit-extra', '')))
logger.info(f"Attempt {attempt} to submit Snakemake job {snakejob}")
try:
FloatLogin().login()
output = subprocess.check_output(cmd)
output = json.loads(output.decode())
jobid = output['id']
except subprocess.CalledProcessError:
msg = 'Failed to submit job'
logger.exception(msg)
raise
except (UnicodeError, json.JSONDecodeError):
msg = 'Failed to decode submit response'
logger.exception(msg)
raise
except KeyError:
msg = 'Failed to obtain float job id'
logger.exception(msg)
raise
logger.info(
f"Submitted Snakemake job {snakejob}"
f" as float job {jobid} with name {job_name}"
)
logger.debug(f"With command: {cmd}")
logger.debug(f"OpCenter response:\n{output}")
return jobid
def work_dir(self):
return self._config.parameters()['work-dir']
if __name__ == '__main__':
jobscript = sys.argv[1]
float_submit = FloatSubmit()
try:
with open(jobscript, 'r') as js:
script_lines = js.readlines()
logger.debug('Opened jobscript for reading')
except OSError:
logger.exception('Cannot open jobscript for reading')
raise
script_lines.insert(3, f"cd {float_submit.work_dir()}\n")
# Hack to allow --use-conda
exec_job_cmd = script_lines[-1]
if '--use-conda' in exec_job_cmd:
logger.debug('Prefixing jobscript to allow --use-conda')
conda_prefix = '/memverge/.snakemake'
script_lines[3: 3] = [
f"mkdir -p {conda_prefix}/conda\n",
f"mkdir -p {conda_prefix}/conda-archive\n"
]
# Replace conda-frontend and conda-prefix in jobscript
exec_job_cmd = re.sub(r" --conda-frontend '.+'", '', exec_job_cmd)
exec_job_cmd = re.sub(r" --conda-prefix '.+'", '', exec_job_cmd)
conda_part = list(exec_job_cmd.partition(' --use-conda'))
override = f" --conda-frontend 'mamba' --conda-prefix '{conda_prefix}'"
conda_part[1] += override
script_lines[-1] = ''.join(conda_part)
try:
with open(jobscript, 'w') as js:
js.writelines(script_lines)
logger.debug('Wrote modifications to jobscript ')
except OSError:
logger.exception('Cannot open jobscript for writing')
raise
jobid = float_submit.submit_job(jobscript)
print(jobid)