From 5e24b5d04ceb8995a4e33d80076c18228fa44a45 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Tue, 12 Nov 2024 17:05:15 +0100 Subject: [PATCH 1/4] Improve reference metadata handling for EventSource The recent addition of attaching the reference metadata of input files to the provenance information implemented only that the reference metadata is read directly by ctapipe from the input file. This made it necessary to either support all possible input file types or impossible for plugin event sources to provide this metadata on their own. It also makes the assumption 1 EventSource = 1 input file. This is not true for some event sources. This issue is solved by: * Adding the possibility to directly provide the reference metadata to `add_input_file` * Move the responsibility of calling `add_input_file` from the EventSource baseclass to the implementation --- src/ctapipe/core/provenance.py | 266 +++++++++++++++------------- src/ctapipe/io/eventsource.py | 21 +-- src/ctapipe/io/hdf5eventsource.py | 8 +- src/ctapipe/io/simteleventsource.py | 6 + 4 files changed, 156 insertions(+), 145 deletions(-) diff --git a/src/ctapipe/core/provenance.py b/src/ctapipe/core/provenance.py index a8f602e127f..3bb5060551c 100644 --- a/src/ctapipe/core/provenance.py +++ b/src/ctapipe/core/provenance.py @@ -62,6 +62,138 @@ class MissingReferenceMetadata(UserWarning): """Warning raised if reference metadata could not be read from input file.""" +class _ActivityProvenance: + """ + Low-level helper class to collect provenance information for a given + *activity*. Users should use `Provenance` as a top-level API, + not this class directly. + """ + + def __init__(self, activity_name=sys.executable): + self._prov = { + "activity_name": activity_name, + "activity_uuid": str(uuid.uuid4()), + "status": "running", + "start": {}, + "stop": {}, + "system": {}, + "input": [], + "output": [], + "exit_code": None, + } + self.name = activity_name + + def start(self): + """begin recording provenance for this activity. Set's up the system + and startup provenance data. Generally should be called at start of a + program.""" + self._prov["start"].update(_sample_cpu_and_memory()) + self._prov["system"].update(_get_system_provenance()) + + def _register(self, what, url, role, add_meta, reference_meta): + if what not in {"input", "output"}: + raise ValueError("what must be 'input' or 'output'") + + reference_meta = self._get_reference_meta( + url=url, reference_meta=reference_meta, read_meta=add_meta + ) + self._prov[what].append(dict(url=url, role=role, reference_meta=reference_meta)) + + def register_input(self, url, role=None, add_meta=True, reference_meta=None): + """ + Add a URL of a file to the list of inputs (can be a filename or full + url, if no URL specifier is given, assume 'file://') + + Parameters + ---------- + url: str + filename or url of input file + role: str + role name that this input satisfies + add_meta: bool + If true, try to load reference metadata from input file + and add to provenance. + """ + self._register( + "input", url, role=role, add_meta=add_meta, reference_meta=reference_meta + ) + + def register_output(self, url, role=None, add_meta=True, reference_meta=None): + """ + Add a URL of a file to the list of outputs (can be a filename or full + url, if no URL specifier is given, assume 'file://') + + Should only be called once the file is finalized, so that reference metadata + can be read. + + Parameters + ---------- + url: str + filename or url of output file + role: str + role name that this output satisfies + add_meta: bool + If true, try to load reference metadata from input file + and add to provenance. + """ + self._register( + "output", url, role=role, add_meta=add_meta, reference_meta=reference_meta + ) + + def register_config(self, config): + """add a dictionary of configuration parameters to this activity""" + self._prov["config"] = config + + def finish(self, status="success", exit_code=0): + """record final provenance information, normally called at shutdown.""" + self._prov["stop"].update(_sample_cpu_and_memory()) + + # record the duration (wall-clock) for this activity + t_start = Time(self._prov["start"]["time_utc"], format="isot") + t_stop = Time(self._prov["stop"]["time_utc"], format="isot") + self._prov["status"] = status + self._prov["exit_code"] = exit_code + self._prov["duration_min"] = (t_stop - t_start).to("min").value + + @property + def output(self): + return self._prov.get("output", None) + + @property + def input(self): + return self._prov.get("input", None) + + def sample_cpu_and_memory(self): + """ + Record a snapshot of current CPU and memory information. + """ + if "samples" not in self._prov: + self._prov["samples"] = [] + self._prov["samples"].append(_sample_cpu_and_memory()) + + @property + def provenance(self): + return self._prov + + def _get_reference_meta( + self, url, reference_meta=None, read_meta=True + ) -> dict | None: + # here to prevent circular imports / top-level cross-dependencies + from ..io.metadata import read_reference_metadata + + if reference_meta is not None or read_meta is False: + return reference_meta + + try: + return read_reference_metadata(url).to_dict() + except Exception: + warnings.warn( + f"Could not read reference metadata for input file: {url}", + MissingReferenceMetadata, + ) + return None + + class Provenance(metaclass=Singleton): """ Manage the provenance info for a stack of *activities* @@ -85,18 +217,18 @@ def start_activity(self, activity_name=sys.executable): activity.start() self._activities.append(activity) log.debug(f"started activity: {activity_name}") + return activity - def _get_current_or_start_activity(self): + def _get_current_or_start_activity(self) -> _ActivityProvenance: if self.current_activity is None: log.info( "No activity has been explicitly started, starting new default activity." " Consider calling Provenance().start_activity() explicitly." ) - self.start_activity() - + return self.start_activity() return self.current_activity - def add_input_file(self, filename, role=None, add_meta=True): + def add_input_file(self, filename, role=None, add_meta=True, reference_meta=None): """register an input to the current activity Parameters @@ -107,7 +239,12 @@ def add_input_file(self, filename, role=None, add_meta=True): role this input file satisfies (optional) """ activity = self._get_current_or_start_activity() - activity.register_input(abspath(filename), role=role, add_meta=add_meta) + activity.register_input( + abspath(filename), + role=role, + add_meta=add_meta, + reference_meta=reference_meta, + ) log.debug( "added input entity '%s' to activity: '%s'", filename, @@ -216,125 +353,6 @@ def clear(self): self._finished_activities = [] -class _ActivityProvenance: - """ - Low-level helper class to collect provenance information for a given - *activity*. Users should use `Provenance` as a top-level API, - not this class directly. - """ - - def __init__(self, activity_name=sys.executable): - self._prov = { - "activity_name": activity_name, - "activity_uuid": str(uuid.uuid4()), - "status": "running", - "start": {}, - "stop": {}, - "system": {}, - "input": [], - "output": [], - "exit_code": None, - } - self.name = activity_name - - def start(self): - """begin recording provenance for this activity. Set's up the system - and startup provenance data. Generally should be called at start of a - program.""" - self._prov["start"].update(_sample_cpu_and_memory()) - self._prov["system"].update(_get_system_provenance()) - - def register_input(self, url, role=None, add_meta=True): - """ - Add a URL of a file to the list of inputs (can be a filename or full - url, if no URL specifier is given, assume 'file://') - - Parameters - ---------- - url: str - filename or url of input file - role: str - role name that this input satisfies - add_meta: bool - If true, try to load reference metadata from input file - and add to provenance. - """ - reference_meta = self._get_reference_meta(url=url) if add_meta else None - self._prov["input"].append( - dict(url=url, role=role, reference_meta=reference_meta) - ) - - def register_output(self, url, role=None, add_meta=True): - """ - Add a URL of a file to the list of outputs (can be a filename or full - url, if no URL specifier is given, assume 'file://') - - Should only be called once the file is finalized, so that reference metadata - can be read. - - Parameters - ---------- - url: str - filename or url of output file - role: str - role name that this output satisfies - add_meta: bool - If true, try to load reference metadata from input file - and add to provenance. - """ - reference_meta = self._get_reference_meta(url=url) if add_meta else None - self._prov["output"].append( - dict(url=url, role=role, reference_meta=reference_meta) - ) - - def register_config(self, config): - """add a dictionary of configuration parameters to this activity""" - self._prov["config"] = config - - def finish(self, status="success", exit_code=0): - """record final provenance information, normally called at shutdown.""" - self._prov["stop"].update(_sample_cpu_and_memory()) - - # record the duration (wall-clock) for this activity - t_start = Time(self._prov["start"]["time_utc"], format="isot") - t_stop = Time(self._prov["stop"]["time_utc"], format="isot") - self._prov["status"] = status - self._prov["exit_code"] = exit_code - self._prov["duration_min"] = (t_stop - t_start).to("min").value - - @property - def output(self): - return self._prov.get("output", None) - - @property - def input(self): - return self._prov.get("input", None) - - def sample_cpu_and_memory(self): - """ - Record a snapshot of current CPU and memory information. - """ - if "samples" not in self._prov: - self._prov["samples"] = [] - self._prov["samples"].append(_sample_cpu_and_memory()) - - @property - def provenance(self): - return self._prov - - def _get_reference_meta(self, url): - # here to prevent circular imports / top-level cross-dependencies - from ..io.metadata import read_reference_metadata - - try: - return read_reference_metadata(url).to_dict() - except Exception: - warnings.warn( - f"Could not read reference metadata for input file: {url}", - MissingReferenceMetadata, - ) - - def _get_python_packages(): def _sortkey(dist): """Sort packages by name, case insensitive""" diff --git a/src/ctapipe/io/eventsource.py b/src/ctapipe/io/eventsource.py index 0a0a7c2c298..fd0f367970b 100644 --- a/src/ctapipe/io/eventsource.py +++ b/src/ctapipe/io/eventsource.py @@ -16,7 +16,7 @@ SimulatedShowerDistribution, SimulationConfigContainer, ) -from ..core import Provenance, ToolConfigurationError +from ..core import ToolConfigurationError from ..core.component import Component, find_config_in_hierarchy from ..core.traits import CInt, Int, Path, Set, TraitError, Undefined from ..instrument import SubarrayDescription @@ -134,23 +134,6 @@ def __new__(cls, input_url=Undefined, config=None, parent=None, **kwargs): return super().__new__(subcls) def __init__(self, input_url=None, config=None, parent=None, **kwargs): - """ - Class to handle generic input files. Enables obtaining the "source" - generator, regardless of the type of file (either hessio or camera - file). - - Parameters - ---------- - config : traitlets.loader.Config - Configuration specified by config file or cmdline arguments. - Used to set traitlet values. - Set to None if no configuration to pass. - tool : ctapipe.core.Tool - Tool executable that is calling this component. - Passes the correct logger to the component. - Set to None if no Tool to pass. - kwargs - """ # traitlets differentiates between not getting the kwarg # and getting the kwarg with a None value. # the latter overrides the value in the config with None, the former @@ -166,8 +149,6 @@ def __init__(self, input_url=None, config=None, parent=None, **kwargs): if self.max_events: self.log.info(f"Max events being read = {self.max_events}") - Provenance().add_input_file(str(self.input_url), role="DL0/Event") - @staticmethod @abstractmethod def is_compatible(file_path): diff --git a/src/ctapipe/io/hdf5eventsource.py b/src/ctapipe/io/hdf5eventsource.py index 3050dd7f752..24d45def228 100644 --- a/src/ctapipe/io/hdf5eventsource.py +++ b/src/ctapipe/io/hdf5eventsource.py @@ -44,7 +44,7 @@ TimingParametersContainer, TriggerContainer, ) -from ..core import Container, Field +from ..core import Container, Field, Provenance from ..core.traits import UseEnum from ..instrument import SubarrayDescription from ..instrument.optics import FocalLengthKind @@ -53,6 +53,7 @@ from .datalevels import DataLevel from .eventsource import EventSource from .hdf5tableio import HDF5TableReader +from .metadata import _read_reference_metadata_hdf5 from .tableloader import DL2_SUBARRAY_GROUP, DL2_TELESCOPE_GROUP, POINTING_GROUP __all__ = ["HDF5EventSource"] @@ -203,6 +204,11 @@ def __init__(self, input_url=None, config=None, parent=None, **kwargs): super().__init__(input_url=input_url, config=config, parent=parent, **kwargs) self.file_ = tables.open_file(self.input_url) + meta = _read_reference_metadata_hdf5(self.file_) + Provenance().add_input_file( + str(self.input_url), role="Event", reference_meta=meta + ) + self._full_subarray = SubarrayDescription.from_hdf( self.input_url, focal_length_choice=self.focal_length_choice, diff --git a/src/ctapipe/io/simteleventsource.py b/src/ctapipe/io/simteleventsource.py index d84acc3e27b..ea41a2d0087 100644 --- a/src/ctapipe/io/simteleventsource.py +++ b/src/ctapipe/io/simteleventsource.py @@ -537,6 +537,12 @@ def __init__(self, input_url=Undefined, config=None, parent=None, **kwargs): skip_calibration=self.skip_calibration_events, zcat=not self.back_seekable, ) + # TODO: read metadata from simtel metaparams once we have files that + # actually provide the reference metadata. + Provenance().add_input_file( + str(self.input_url), role="DL0/Event", add_meta=False + ) + if self.back_seekable and self.is_stream: raise OSError("back seekable was required but not possible for inputfile") ( From 3f2855cf1bab370559dae3d2a094bbdbc2287af7 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Tue, 12 Nov 2024 18:08:11 +0100 Subject: [PATCH 2/4] Add changelog --- docs/changes/2648.api.rst | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 docs/changes/2648.api.rst diff --git a/docs/changes/2648.api.rst b/docs/changes/2648.api.rst new file mode 100644 index 00000000000..01dbd2bc725 --- /dev/null +++ b/docs/changes/2648.api.rst @@ -0,0 +1,10 @@ +* Add possibility to directly pass the reference metadata to + ``Provenance.add_input_file``. +* Remove the call to ``Provenace.add_input_file`` from the + ``EventSource`` base class. +* Add the proper calls to ``Provenance.add_input_file`` in + ``HDF5EventSource`` (providing the metadata) and + ``SimTelEventSource`` (not providing metadata yet, but avoiding a warning) +* Plugin implementations of ``EventSource`` should make sure they + register their input files using ``Provenance.add_input_file``, preferably + providing also the reference metadata. From 4c2aafe017907dec4d6cd011e6d55581c76c409a Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Wed, 13 Nov 2024 13:25:48 +0100 Subject: [PATCH 3/4] Add tests for {HDF5,SimTel}EventSouce filling Provenance info --- src/ctapipe/conftest.py | 14 ++++++++++++++ src/ctapipe/core/tests/test_provenance.py | 14 -------------- src/ctapipe/io/tests/test_hdf5eventsource.py | 15 +++++++++++++++ src/ctapipe/io/tests/test_simteleventsource.py | 12 ++++++++++++ 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/ctapipe/conftest.py b/src/ctapipe/conftest.py index c6599dfe86d..f692489f4af 100644 --- a/src/ctapipe/conftest.py +++ b/src/ctapipe/conftest.py @@ -692,3 +692,17 @@ def dl1_mon_pointing_file(dl1_file, dl1_tmp_path): f.remove_node("/configuration/telescope/pointing", recursive=True) return path + + +@pytest.fixture +def provenance(monkeypatch): + from ctapipe.core import Provenance + + # the singleton nature of Provenance messes with + # the order-independence of the tests asserting + # the provenance contains the correct information + # so we monkeypatch back to an empty state here + prov = Provenance() + monkeypatch.setattr(prov, "_activities", []) + monkeypatch.setattr(prov, "_finished_activities", []) + return prov diff --git a/src/ctapipe/core/tests/test_provenance.py b/src/ctapipe/core/tests/test_provenance.py index 465e11ba27e..2ec4da8b924 100644 --- a/src/ctapipe/core/tests/test_provenance.py +++ b/src/ctapipe/core/tests/test_provenance.py @@ -1,24 +1,10 @@ import json -import pytest - from ctapipe.core import Provenance from ctapipe.core.provenance import _ActivityProvenance from ctapipe.io.metadata import Reference -@pytest.fixture -def provenance(monkeypatch): - # the singleton nature of Provenance messes with - # the order-independence of the tests asserting - # the provenance contains the correct information - # so we monkeypatch back to an empty state here - prov = Provenance() - monkeypatch.setattr(prov, "_activities", []) - monkeypatch.setattr(prov, "_finished_activities", []) - return prov - - def test_provenance_activity_names(provenance): provenance.start_activity("test1") provenance.add_input_file("input.txt") diff --git a/src/ctapipe/io/tests/test_hdf5eventsource.py b/src/ctapipe/io/tests/test_hdf5eventsource.py index 0eb5a677d5b..d1ebf8c3f28 100644 --- a/src/ctapipe/io/tests/test_hdf5eventsource.py +++ b/src/ctapipe/io/tests/test_hdf5eventsource.py @@ -270,3 +270,18 @@ def test_simulated_events_distribution(dl1_file): dist = source.simulated_shower_distributions[1] assert dist["n_entries"] == 1000 assert dist["histogram"].sum() == 1000.0 + + +def test_provenance(dl1_file, provenance): + """Make sure that HDF5EventSource reads reference metadata and adds to provenance""" + from ctapipe.io.metadata import _read_reference_metadata_hdf5 + + provenance.start_activity("test_hdf5eventsource") + with HDF5EventSource(input_url=dl1_file): + pass + + inputs = provenance.current_activity.input + assert len(inputs) == 1 + assert inputs[0]["url"] == str(dl1_file) + meta = _read_reference_metadata_hdf5(dl1_file) + assert inputs[0]["reference_meta"].product.id_ == meta.product.id_ diff --git a/src/ctapipe/io/tests/test_simteleventsource.py b/src/ctapipe/io/tests/test_simteleventsource.py index 3840aab3397..7296fb7def5 100644 --- a/src/ctapipe/io/tests/test_simteleventsource.py +++ b/src/ctapipe/io/tests/test_simteleventsource.py @@ -641,3 +641,15 @@ def test_shower_distribution(prod5_gamma_simtel_path): assert len(distributions) == 1 distribution = distributions[source.obs_id] assert distribution.n_entries == 1000 + + +def test_provenance(provenance, prod5_gamma_simtel_path): + provenance.start_activity("test_simteleventsource") + + with SimTelEventSource(prod5_gamma_simtel_path): + pass + + inputs = provenance.current_activity.input + assert len(inputs) == 1 + assert inputs[0]["url"] == str(prod5_gamma_simtel_path) + assert inputs[0]["reference_meta"] is None From 0981806ab194492636a232228a65511004a74fc6 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Mon, 18 Nov 2024 14:55:38 +0100 Subject: [PATCH 4/4] Fix section title --- src/ctapipe/io/eventsource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ctapipe/io/eventsource.py b/src/ctapipe/io/eventsource.py index fd0f367970b..5963e81a05e 100644 --- a/src/ctapipe/io/eventsource.py +++ b/src/ctapipe/io/eventsource.py @@ -79,9 +79,9 @@ class EventSource(Component): as these are mutable and may lead to errors when analyzing multiple events. - Attributes + Parameters ---------- - input_url : str + input_url : str | Path Path to the input event file. max_events : int Maximum number of events to loop through in generator