Skip to content

Commit

Permalink
fix: reduce code repetition
Browse files Browse the repository at this point in the history
feat: map to mask tests
  • Loading branch information
meono committed Jul 19, 2016
1 parent 9d543ca commit a955c05
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 57 deletions.
107 changes: 54 additions & 53 deletions iLoop_RNAseq_pipeline/master_qsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ def fastqc_job(project_path, groups, output_path, defaults, ppn='8', walltime='0
return '\n\n'.join(jobstr).replace('PPN', ppn)


def mapandlink_jobs(project_path, sample, reads, defaults, ref, jobs, ppn='8', walltime='12:00:00'):
def mapandlink_jobs(project_path, sample, reads, defaults, ref, jobs, ppn='8', walltime='12:00:00', map_to_mask=False):
mljobs = ['hisat2', 'stringtie', 'cufflinks', 'htseq-count']
if jobs == []:
if map_to_mask:
jobs = ['hisat2_to_mask']
elif (jobs == []) and (map_to_mask == False):
jobs = mljobs

jobstr = []
jobstr += [job_header.replace('JOBNAME', '_'.join([sample] + [job for job in jobs if job in mljobs]))
.replace('WALLTIME', walltime)
Expand All @@ -106,21 +109,33 @@ def mapandlink_jobs(project_path, sample, reads, defaults, ref, jobs, ppn='8', w
PATH=$PATH:/home/projects/cu_10010/programs/hisat2-2.0.4:/home/projects/cu_10010/programs/stringtie-1.2.3.Linux_x86_64
export PATH''']

jobstr += ['export HISAT2_INDEXES={}'.format(ref['hisat2_indexes'])]
jobstr += ['mkdir {}/{}'.format(project_path, sample)]
jobstr += ['export hisat2_genomic_indexes={}'.format(ref['hisat2_genomic_indexes'])]
if map_to_mask:
jobstr += ['mkdir -p {}/{}/{}'.format(project_path, 'map_to_mask', sample)]
else:
jobstr += ['mkdir {}/{}'.format(project_path, sample)]

R1reads = [read for read in reads if read[-15:-13] == 'R1']
R1reads.sort()
R2reads = [read for read in reads if read[-15:-13] == 'R2']
R2reads.sort()

if (ref.get('hisat2_indexes')) and ('hisat2' in jobs):
if (ref.get('hisat2_mask_indexes')) and ('hisat2_to_mask' in jobs):
logger.info('Using hisat2 options: {}'.format(defaults['hisat2_options']))
jobstr += ['''echo "hisat2"
hisat2 {} -p PPN -x {} {} {} -S {} 2>{} '''.format(defaults['hisat2_options'],
(ref['hisat2_mask_indexes']),
('-1' + ','.join(R1reads)),
('-2' + ','.join(R2reads)),
(os.path.abspath(os.path.join(project_path, 'map_to_mask', sample, 'accepted_hits.sam'))),
(os.path.abspath(os.path.join(project_path, 'map_to_mask', sample, 'align_summary.txt'))))]
elif (ref.get('hisat2_genomic_indexes')) and ('hisat2' in jobs):
logger.info('Using hisat2 options: {}'.format(defaults['hisat2_options']))
jobstr += ['''echo "hisat2"
hisat2 {} -p PPN -x {} {} {} 2>{} | \
samtools view -@ PPN -hbu - | \
samtools sort -@ PPN - {}'''.format(defaults['hisat2_options'],
ref['hisat2_indexes'],
ref['hisat2_genomic_indexes'],
('-1' + ','.join(R1reads)),
('-2' + ','.join(R2reads)),
(os.path.abspath(os.path.join(project_path, sample, 'align_summary.txt'))),
Expand Down Expand Up @@ -283,9 +298,21 @@ def diff_job(project_path, groups, quantjobsIDs, ppn='8', walltime='24:00:00', r
return '\n\n'.join(jobstr).replace('PPN', ppn)


def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw', jobs=None):
def job_submitter(js, path, name):
jfn = os.path.join(path, name)
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
os.system('sleep 0.5')
return out.strip().decode(sys.getdefaultencoding())

def job_organizer(project_path, groups, ref, defaults, map_to_mask, ppn='8', readtype='raw', jobs=None):
try:
os.mkdir(os.path.join(project_path, 'job_files'))
job_files_path = os.path.abspath(os.path.join(project_path, 'job_files'))
os.mkdir(job_files_path)
except FileExistsError:
logger.warning('Folder for job files exists. Previously generated job files will be overwritten.')

Expand All @@ -300,15 +327,7 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',
output_path = os.path.abspath(os.path.join(project_path, 'reads', 'QC_output', readtype))
if not check_fastqc(groups=groups, output_path=output_path):
js = fastqc_job(project_path=project_path, groups=groups, output_path=output_path, defaults=defaults)
jfn = os.path.join(project_path, 'job_files', 'job_fastqc.sh')
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
qcjobID = out.strip().decode(sys.getdefaultencoding())
os.system('sleep 0.5')
qcjobID = job_submitter(js, job_files_path, 'job_fastqc.sh')
else:
logger.info('Existing fastqc files found. Skipping quality check job.')
qcjobID = True
Expand All @@ -321,6 +340,20 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',
# TODO: Come up with a qc threshold to continue or terminate jobs. Use qcjobID from above.

# generate and submit map and link jobs
if map_to_mask:
try:
for group_name, group in groups.items():
for sample, reads in group.items():
js = mapandlink_jobs(project_path=project_path, sample=sample, reads=reads, ref=ref,
defaults=defaults, ppn=ppn, jobs=jobs, map_to_mask=map_to_mask)
job_submitter(js, job_files_path, 'job_{}_mapandlink_to_mask.sh'.format(sample))
except Exception as ex:
logger.error(
'Problem with map and link. RNAseq analysis is stopped.\nAn exception of type {} occured. Arguments:\n{}'.format(
type(ex).__name__, ex.args))
return False
# TODO: a threshold for mapping to mask/genome can be used to decide whether to go forward or stop pipeline. At least generate a warning.

mljobs = ['hisat2', 'stringtie', 'cufflinks', 'htseq-count']
if (any(job for job in jobs if job in mljobs)) or (jobs == []):
try:
Expand All @@ -329,15 +362,7 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',
for sample, reads in group.items():
js = mapandlink_jobs(project_path=project_path, sample=sample, reads=reads, ref=ref,
defaults=defaults, ppn=ppn, jobs=jobs)
jfn = os.path.join(project_path, 'job_files', 'job_{}_mapandlink.sh'.format(sample))
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
mapjobIDs.append(out.strip().decode(sys.getdefaultencoding()))
os.system('sleep 0.5')
mapjobIDs.append(job_submitter(js, job_files_path, 'job_{}_mapandlink.sh'.format(sample)))
except Exception as ex:
logger.error(
'Problem with map and link. RNAseq analysis is stopped.\nAn exception of type {} occured. Arguments:\n{}'.format(
Expand Down Expand Up @@ -372,15 +397,7 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',

try:
js = merge_job(project_path=project_path, mapjobIDs=mapjobIDs, ref=ref, defaults=defaults)
jfn = os.path.join(project_path, 'job_files', 'job_cuffmerge.sh')
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
mergejob = out.strip().decode(sys.getdefaultencoding())
os.system('sleep 0.5')
mergejob = job_submitter(js, job_files_path, 'job_cuffmerge.sh')
except Exception as ex:
logger.error(
'Problem with Cuffmerge. RNAseq analysis is stopped.\nAn exception of type {} occured. Arguments:\n{}'.format(
Expand All @@ -397,15 +414,7 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',
for sample, reads in group.items():
js = quant_jobs(project_path=project_path, sample=sample, mergejob=mergejob, ref=ref,
defaults=defaults, ppn=ppn)
jfn = os.path.join(project_path, 'job_files', 'job_{}_cuffquant.sh'.format(sample))
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
quantjobsIDs.append(out.strip().decode(sys.getdefaultencoding()))
os.system('sleep 0.5')
quantjobsIDs.append(job_submitter(js, job_files_path, 'job_{}_cuffquant.sh'.format(sample)))
except Exception as ex:
logger.error(
'Problem with Cuffquant jobs. RNAseq analysis is stopped.\nAn exception of type {} occured. Arguments:\n{}'.format(
Expand All @@ -425,15 +434,7 @@ def job_submitter(project_path, groups, ref, defaults, ppn='8', readtype='raw',
js = diff_job(project_path=project_path, groups=groups, quantjobsIDs=quantjobsIDs, ppn=ppn,
walltime='24:00:00',
ref=ref, defaults=defaults)
jfn = os.path.join(project_path, 'job_files', 'job_cuffdiff.sh')
jf = open(jfn, 'w')
jf.write(js)
jf.close()
p = subprocess.Popen(['qsub', jfn], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait()
out, err = p.communicate()
diffjob = out.strip().decode(sys.getdefaultencoding())
os.system('sleep 0.5')
diffjob = job_submitter(js, job_files_path, 'job_cuffdiff.sh')
except Exception as ex:
logger.error(
'Problem with Cuffdiff. RNAseq analysis is stopped.\nAn exception of type {} occured. Arguments:\n{}'.format(
Expand Down
6 changes: 4 additions & 2 deletions run_RNAseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
help='Path to reads folder(s). Can be suplied as comma separated string. All files under the path tree will be added.',
default=None)
parser.add_argument('-j', '--jobs', help='Comma separated list of jobs to run.', default=[])
parser.add_argument('-m', '--map-to-mask', help='Run a separate mapping pipeline to mask.', action='store_true', default=False)
parser.add_argument('-n', '--number-of-cores', help='Number of cores to assign to individual jobs', default='8')
parser.add_argument('-s', '--strain-code',
help='Code for the reference strain. Available strains and corresponding codes are on "RNAseq_pipeline_references.tsv".',
Expand All @@ -40,13 +41,14 @@ def main():
groups = mm.find_groups(reads)
essentials = [defaults, ref, reads, project_path, groups]
if not any(ess for ess in essentials if not ess):
subs = mq.job_submitter(project_path=project_path,
subs = mq.job_organizer(project_path=project_path,
groups=groups,
readtype='raw',
ref=ref,
defaults=defaults,
ppn=args.number_of_cores,
jobs=jobs)
jobs=jobs,
map_to_mask=args.map_to_mask)
else:
logger.error('Something is missing: \n{}'.format(
', '.join([ess for ess in essentials if not ess])))
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
license='Apache License, Version 2.0',
include_package_data=True,
install_requires=requirements,
# data_files=[('iLoop_RNAseq_pipeline', ['iLoop_RNAseq_pipeline/defaults/References.tsv', 'iLoop_RNAseq_pipeline/defaults/RNAseq_pipeline_defaults.txt'])],
# long_description=open('dontREADMEyet.md').read(),
long_description=open('README.md').read(),
# data_files=[],
)

0 comments on commit a955c05

Please sign in to comment.