Skip to content

Commit

Permalink
Dev (#129)
Browse files Browse the repository at this point in the history
* Update parse_sumstats_ftp_logs.py

fix counting failures after ftp restructure

* gwas-utils#127, new harmonisation wrapper script

* add working dir to harm wrapper

* update nextflow starter script

* Update solr_indexing.nf
  • Loading branch information
jdhayhurst authored Nov 9, 2022
1 parent 87b8679 commit 69a1a17
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 60 deletions.
14 changes: 13 additions & 1 deletion ftpSummaryStatsScript/ftp_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ def release_files_for_harmonisation(self):
for study in self.get_files_to_harmonise():
logger.info("{} --> harmonisation queue".format(study))
source = self.staging_studies_dict[study]
dest_dir = os.path.join(self.harmonise_path, study)
date_today = datetime.datetime.now().strftime("%Y%m%d")
dest_dir = os.path.join(self.harmonise_path, date_today)
self.make_dir(dest_dir)
self.rsync_dir(source, dest_dir)

Expand Down Expand Up @@ -314,6 +315,17 @@ def rsync_dir(source, dest):
except OSError as e:
logger.error(e)

@staticmethod
def rsync_pattern(source, dest, pattern):
source = source + "/"
try:
logger.info("Sync {} --> {}".format(source, dest))
subprocess.call(
['rsync', '-rpvh', '--chmod=Du=rwx,Dg=rwx,Do=rx,Fu=rw,Fg=rw,Fo=r', '--size-only',
f'--include={pattern}', source, dest])
except OSError as e:
logger.error(e)

def remove_unexepcted_from_ftp(self):
# NOT api AND ftp
if len(self.remove_from_ftp) > 0:
Expand Down
82 changes: 26 additions & 56 deletions harmonisationUtils/harmonisation_wrapper.sh
Original file line number Diff line number Diff line change
@@ -1,67 +1,37 @@
#!/bin/bash

# 1. Runs harmonisation pipeline on all tsv files in $toharmonise
# 2. If/when successful, those harmonised files are released to the $ftp_dir and the $api_load
ref=$1
all_harm_folder=$2
ftp=$3
failed=$4
version=$5
wd=$6
mail_add=$7

toharmonise=$1 # dir where tsv files are ready to be harmonised
snakemake_dir=$2 # dir where https://github.com/EBISPOT/gwas-sumstats-harmoniser/blob/master/Snakefile is
release_dir=$3 # dir for staging completed files for release
ftp_dir=$4 # ftp parent dir
reports=$5 # reports dir
api_load=$6 # loading dir for the SumStats HDF5 API loader
venv=$7 #.env/bin/activate
# ENV VARS

source $venv
cd $snakemake_dir
export NXF_SINGULARITY_CACHEDIR=$SINGULARITY_CACHEDIR

for f in $toharmonise/*.tsv; do
if [ -e $f ]; then
n=$(echo $f | sed "s/.tsv//g")
h=$n/harmonised.qc.tsv
# if job already running or snakemake target already exists.
if bjobs -w | grep -q $n || [ $(find $n -maxdepth 1 -mmin +30 -type f -name harmonised.qc.tsv) ] ; then
if snakemake -n -d $n --configfile $snakemake_dir/config.yaml --profile lsf $h | tail -n1 | grep -q "Nothing to be done."; then
echo "$h --> Ready to release"
# Load modules

# set vars
file_id=$(basename $n)
file_id_no_build=$(echo $file_id | sed 's/-[bB]uild.*//g' )
gcst=$(echo $file_id | cut -f2 -d '-' )
module load openjdk-16.0.2-gcc-9.3.0-xyn6nf5
module load nextflow-21.10.6-gcc-9.3.0-tkuemwd
module load singularity-3.7.0-gcc-9.3.0-dp5ffrp

# prep dirs
mkdir -p $release_dir/$gcst
mkdir -p $release_dir/$gcst/harmonised
# Update local nf repo from public

# compress harmonised file
gzip -c $n/harmonised.qc.tsv > $release_dir/$gcst/harmonised/$file_id_no_build.h.tsv.gz
rsync -pv --chmod=Du=rwx,Dg=rwx,Do=rx,Fu=rw,Fg=rw,Fo=r $n/harmonised.qc.tsv $api_load/$file_id_no_build.tsv
# compress associated formatted file
gzip -c $f > $release_dir/$gcst/harmonised/$file_id.f.tsv.gz
NXF_VER=22.05.0-edge nextflow pull EBISPOT/gwas-sumstats-harmoniser

# generate md5sums for these files
md5sum $release_dir/$gcst/harmonised/$file_id_no_build* | sed 's| .*harmonised/| |g' > $release_dir/$gcst/harmonised/md5sum.txt
# Nextflow command

# release to FTP
# identify the remote directory - this find matches the GCST for the end of the dirname
remote=$(find $ftp_dir -maxdepth 2 -type d -name "$gcst")
if [[ $remote ]]; then
echo $remote
rsync -prv --chmod=Du=rwx,Dg=rwx,Do=rx,Fu=rw,Fg=rw,Fo=r $release_dir/$gcst/harmonised $remote/
cd $wd

# clean up harmonised, formatted and raw files
rm -v $f
mv -v $n/report.txt $reports/${gcst}_report.txt
rm -r $n
fi
else
echo "$h --> Still working on it"
fi
else
echo "Submitting $n for harmonisation"
mkdir -p $n
bsub "snakemake --rerun-incomplete -d $n --configfile $snakemake_dir/config.yaml --profile lsf $h"
fi
else
echo "$f doesn't exist, there's nothing to do."
fi
done
NXF_VER=22.05.0-edge nextflow run EBISPOT/gwas-sumstats-harmoniser\
-r $version\
-N $mail_add\
--ref $ref\
--all_harm_folder $all_harm_folder\
--ftp $ftp\
--failed $failed\
--gwascatalog\
-profile cluster,singularity
3 changes: 1 addition & 2 deletions log_analysis/parse_sumstats_ftp_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def main():


raw = df[~df.resource.str.contains('harmonised')]
raw.resource = raw.resource.str.replace('/.*','')
raw = raw.groupby('resource').agg('max')

f_and_h = df[df.resource.str.contains('harmonised')]
Expand Down Expand Up @@ -79,4 +78,4 @@ def main():


if __name__ == '__main__':
main()
main()
2 changes: 1 addition & 1 deletion solrIndexerManager/solr_indexing.nf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ process run_solr_indexer {
tag "$id"
memory { 4.GB * task.attempt }
maxRetries 3
errorStrategy { task.exitStatus in 2..140 ? 'retry' : 'terminate' }
errorStrategy { task.exitStatus in 129..255 ? 'retry' : 'terminate' }

input:
tuple val(id), val(cmd)
Expand Down

0 comments on commit 69a1a17

Please sign in to comment.