Skip to content

Commit

Permalink
reduce the number of files by using zip-files.
Browse files Browse the repository at this point in the history
  • Loading branch information
relleums committed Apr 15, 2024
1 parent 59ad5f9 commit e3eda62
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 150 deletions.
251 changes: 104 additions & 147 deletions plenoptics/production/observations.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,100 @@
import json_line_logger
import os
import zipfile
import sequential_tar
import gzip
import io
import json_utils
import plenopy
import rename_after_writing
import shutil
from .. import sources
from .. import utils


def run(work_dir, pool, logger=json_line_logger.LoggerStdout()):
config = json_utils.tree.read(os.path.join(work_dir, "config"))

mapjobs = []

logger.info("Obs:Map: Making observations in map dir...")
mapjobs = _observations_make_jobs(work_dir=work_dir)
logger.info("Obs:Map: {:d} jobs to do".format(len(mapjobs)))
pool.map(_observations_run_job, mapjobs)
logger.info("Obs:Map: Making observations in map dir done.")

logger.info("Obs:Reduce: Reducing observations from map dir...")
logger.info("Obs:Reduce: {:d} jobs to do".format(len(ojobs)))
logger.info("Obs:Reduce: Reducing observations from map dir dine.")

logger.info("Obs: Complete.")


def _observations_make_jobs(work_dir):
return _tasks_make_jobs(work_dir=work_dir, task_key="responses", suffix="")
logger.info("Observations:Mapping: ...")

mapjobs = []
for instrument_key in config["observations"]["instruments"]:
for observation_key in config["observations"]["instruments"][
instrument_key
]:
base_path = os.path.join(
work_dir, "responses", instrument_key, observation_key
)

def make_mapdir_name(work_dir, task_key, instrument_key, observation_key):
return os.path.join(
work_dir, task_key, instrument_key, observation_key + ".map"
)

result_path = base_path + ".zip"
map_dir = base_path + ".map"

def _observation_star_make_jobs(
work_dir, config, task_key, instrument_key, suffix
):
jobs = []
mapdir = make_mapdir_name(
work_dir=work_dir,
task_key=task_key,
instrument_key=instrument_key,
observation_key="star",
)
for n in range(config["observations"]["star"]["num_stars"]):
nkey = "{:06d}".format(n)
outpath = os.path.join(mapdir, nkey + suffix)
if not os.path.exists(outpath):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": "star",
"number": n,
}
jobs.append(job)
return jobs


def _observations_point_make_jobs(work_dir, config, instrument_key, suffix):
jobs = []
for n in range(config["observations"]["point"]["num_points"]):
nkey = "{:06d}".format(n)
outpath = os.path.join(
work_dir,
task_key,
instrument_key,
"point",
nkey + suffix,
)
if not os.path.exists(outpath):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": "point",
"number": n,
}
jobs.append(job)
return jobs


def _observations_phantom_make_jobs(work_dir, config, instrument_key, suffix):
jobs = []
n = 0
nkey = "{:06d}".format(n)
outpath = os.path.join(
work_dir,
task_key,
instrument_key,
"phantom",
nkey + suffix,
)
if not os.path.exists(outpath):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": "phantom",
"number": n,
}
jobs.append(job)
return jobs
if os.path.exists(result_path):
continue

if observation_key == "star":
num_jobs = config["observations"]["star"]["num_stars"]
elif observation_key == "point":
num_jobs = config["observations"]["point"]["num_points"]
elif observation_key == "phantom":
num_jobs = 1
else:
raise ValueError("Unknown observation_key")

jobs = []
for job_number in range(num_jobs):
job_number_key = "{:06d}".format(job_number)
map_job_path = os.path.join(
map_dir, job_number_key + ".job.zip"
)
if not os.path.exists(map_job_path):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": observation_key,
"number": job_number,
}
jobs.append(job)
logger.info(
"Observations:Mapping: Appending {:d} {:s}/{:s} jobs.".format(
len(jobs), instrument_key, observation_key
)
)
mapjobs += jobs

def _tasks_make_jobs(work_dir, task_key, suffix):
cfg_dir = os.path.join(work_dir, "config")
config = json_utils.tree.read(cfg_dir)
logger.info("Observations:Mapping: {:d} jobs to do".format(len(mapjobs)))
pool.map(_observations_run_mapjob, mapjobs)
logger.info("Observations:Mapping: done.")

jobs = []
reducejobs = []
logger.info("Observations:Reducing: ...")

for instrument_key in config["observations"]["instruments"]:
if instrument_key not in config["instruments"]:
continue

for observation_key in config["observations"]["instruments"][
instrument_key
]:
if observation_key == "star":
jobs += _observation_star_make_jobs(
work_dir=work_dir,
config=config,
instrument_key=instrument_key,
suffix=suffix,
)
elif observation_key == "point":
jobs += _observations_point_make_jobs(
work_dir=work_dir,
config=config,
instrument_key=instrument_key,
suffix=suffix,
)
elif observation_key == "phantom":
jobs = _observations_phantom_make_jobs(
work_dir=work_dir,
config=config,
instrument_key=instrument_key,
suffix=suffix,
)
return jobs
base_path = os.path.join(
work_dir, "responses", instrument_key, observation_key
)

result_path = base_path + ".zip"
if not os.path.exists(result_path):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": observation_key,
}
reducejobs.append(job)

logger.info(
"Observations:Reducing: {:d} jobs to do".format(len(reducejobs))
)
pool.map(_observations_run_reducejob, reducejobs)
logger.info("Observations:Reducing: done.")

def _observations_run_job(job):
logger.info("Observations: Complete.")


def _observations_run_mapjob(job):
mapdir = os.path.join(
job["work_dir"],
"responses",
Expand All @@ -152,7 +103,7 @@ def _observations_run_job(job):
)
os.makedirs(mapdir, exist_ok=True)

outpath = os.path.join(mapdir, "{:06d}.tar".format(job["number"]))
outpath = os.path.join(mapdir, "{:06d}.job.zip".format(job["number"]))

light_field_geometry_path = os.path.join(
job["work_dir"],
Expand Down Expand Up @@ -180,13 +131,20 @@ def _observations_run_job(job):
merlict_config=merlict_config,
)

with sequential_tar.open(outpath, "w") as tar:
with tar.open("truth.json", "wt") as f:
f.write(json_utils.dumps(source_config))
with tar.open("raw_sensor_response.phs", "wb|gz") as f:
plenopy.raw_light_field_sensor_response.write(
f=f, raw_sensor_response=raw_sensor_response
)
with rename_after_writing.open(outpath, "wb") as file:
with zipfile.ZipFile(
file=file, mode="w", compression=zipfile.ZIP_STORED
) as z:
with utils.ZipWriter(
zipfile=z, name="source_config.json", mode="wt"
) as f:
f.write(json_utils.dumps(source_config))
with utils.ZipWriter(
zipfile=z, name="raw_sensor_response.phs.gz", mode="wb|gz"
) as f:
plenopy.raw_light_field_sensor_response.write(
f=f, raw_sensor_response=raw_sensor_response
)


def make_response_to_source(
Expand Down Expand Up @@ -216,25 +174,24 @@ def make_response_to_source(
raise AssertionError("Type of source is not known")


def response_star_map_dir(work_dir, instrument_key):
return os.path.join(work_dir, "responses", instrument_key, "star.map")

def _observations_run_reducejob(job):
base_path = os.path.join(
job["work_dir"],
"responses",
job["instrument_key"],
job["observation_key"],
)

def response_star_map_make_jobs(work_dir, instrument_key, config):
jobs = []
mapdir = response_star_map_dir(
work_dir=work_dir,
instrument_key=instrument_key,
utils.zipfile_reduce(
map_dir=base_path + ".map",
out_path=base_path + ".zip",
job_basenames=[
"source_config.json",
"raw_sensor_response.phs.gz",
],
job_ext=".job.zip",
remove_afer_reduce=True,
)
for n in range(config["observations"]["star"]["num_stars"]):
nkey = "{:06d}".format(n)
outpath = os.path.join(mapdir, nkey + ".tar")
if not os.path.exists(outpath):
job = {
"work_dir": work_dir,
"instrument_key": instrument_key,
"observation_key": "star",
"number": n,
}
jobs.append(job)
return jobs

if os.path.exists(base_path + ".zip"):
shutil.rmtree(base_path + ".map")
Loading

0 comments on commit e3eda62

Please sign in to comment.