From 563251e198732c78c8de89eb1daa8e53fef9b374 Mon Sep 17 00:00:00 2001 From: Samuel Jackson Date: Mon, 19 Aug 2024 09:40:54 +0100 Subject: [PATCH] Add CPF metadata script --- README.md | 8 ++++++ jobs/freia_write_cpf.qsub | 11 +++----- jobs/submit.cpf.sh | 5 ++++ src/create_cpf_metadata.py | 56 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 7 deletions(-) create mode 100755 jobs/submit.cpf.sh create mode 100644 src/create_cpf_metadata.py diff --git a/README.md b/README.md index 456c559..a5d3cff 100644 --- a/README.md +++ b/README.md @@ -117,3 +117,11 @@ mpirun -np 16 python3 -m src.main data/local campaign_shots/tiny_campaign.csv -- This will submit a job to the freia job queue that will ingest all of the shots in the tiny campaign and push them to the s3 bucket. +## CPF Metadata + +To parse CPF metadata we can use the following script (only on Friea): + +```sh +qsub ./jobs/freia_write_cpf.qsub campaign_shots/tiny_campaign.csv +``` + diff --git a/jobs/freia_write_cpf.qsub b/jobs/freia_write_cpf.qsub index 8d8a1eb..ccfa12a 100644 --- a/jobs/freia_write_cpf.qsub +++ b/jobs/freia_write_cpf.qsub @@ -1,20 +1,17 @@ #!/bin/bash -# Verify options and abort if there is a error -#$ -w e - # Choose parallel environment #$ -pe mpi 16 # Specify the job name in the queue system -#$ -N fairmast-dataset-writer +#$ -N fairmast-cpf-writer # Start the script in the current working directory #$ -cwd # Time requirements -#$ -l h_rt=120:00:00 -#$ -l s_rt=120:00:00 +#$ -l h_rt=48:00:00 +#$ -l s_rt=48:00:00 # Activate your environment here! module load python/3.9 @@ -28,4 +25,4 @@ shot_file=$1 export PATH="/home/rt2549/dev/:$PATH" # Run script -python3 -m src.metadata.create_cpf_metadata $shot_file \ No newline at end of file +python3 -m src.create_cpf_metadata $shot_file \ No newline at end of file diff --git a/jobs/submit.cpf.sh b/jobs/submit.cpf.sh new file mode 100755 index 0000000..34f1a89 --- /dev/null +++ b/jobs/submit.cpf.sh @@ -0,0 +1,5 @@ + qsub jobs/freia_write_cpf.qsub campaign_shots/M9.csv + qsub jobs/freia_write_cpf.qsub campaign_shots/M8.csv + qsub jobs/freia_write_cpf.qsub campaign_shots/M7.csv + qsub jobs/freia_write_cpf.qsub campaign_shots/M6.csv + qsub jobs/freia_write_cpf.qsub campaign_shots/M5.csv \ No newline at end of file diff --git a/src/create_cpf_metadata.py b/src/create_cpf_metadata.py new file mode 100644 index 0000000..31f5856 --- /dev/null +++ b/src/create_cpf_metadata.py @@ -0,0 +1,56 @@ +import argparse +import numpy as np +import pandas as pd +import multiprocessing as mp +from functools import partial +from pathlib import Path +from rich.progress import track +from pycpf import pycpf + + +def read_cpf_for_shot(shot, columns): + cpf_data = {} + for name in columns: + entry = pycpf.query(name, f"shot = {shot}") + value = entry[name][0] if name in entry else np.nan + cpf_data[name] = value + + cpf_data['shot_id'] = shot + return cpf_data + +def main(): + parser = argparse.ArgumentParser( + prog="FAIR MAST Ingestor", + description="Parse the MAST archive and writer to Zarr/NetCDF/HDF files", + ) + + parser.add_argument("shot_file") + args = parser.parse_args() + + shot_file = args.shot_file + shot_ids = pd.read_csv(shot_file) + shot_ids = shot_ids['shot_id'].values + + columns = pycpf.columns() + columns = pd.DataFrame(columns, columns=['name', 'description']) + columns.to_parquet(f'data/{Path(shot_file).stem}_cpf_columns.parquet') + + pool = mp.Pool(16) + column_names = columns['name'].values + func = partial(read_cpf_for_shot, columns=column_names) + mapper = pool.imap_unordered(func, shot_ids) + rows = [item for item in track(mapper, total=len(shot_ids))] + cpf_data = pd.DataFrame(rows) + + # Convert objects to strings + for column in cpf_data.columns: + dtype = cpf_data[column].dtype + if isinstance(dtype, object): + cpf_data[column] = cpf_data[column].astype(str) + + cpf_data.to_parquet(f'data/{Path(shot_file).stem}_cpf_data.parquet') + print(cpf_data) + + +if __name__ == "__main__": + main()