Skip to content

Commit

Permalink
Merge pull request #2648 from cta-observatory/reference_meta_event_so…
Browse files Browse the repository at this point in the history
…urce

Improve reference metadata handling for EventSource
  • Loading branch information
maxnoe authored Nov 18, 2024
2 parents 9f2d9a7 + 0981806 commit c1e7163
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 161 deletions.
10 changes: 10 additions & 0 deletions docs/changes/2648.api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
* Add possibility to directly pass the reference metadata to
``Provenance.add_input_file``.
* Remove the call to ``Provenace.add_input_file`` from the
``EventSource`` base class.
* Add the proper calls to ``Provenance.add_input_file`` in
``HDF5EventSource`` (providing the metadata) and
``SimTelEventSource`` (not providing metadata yet, but avoiding a warning)
* Plugin implementations of ``EventSource`` should make sure they
register their input files using ``Provenance.add_input_file``, preferably
providing also the reference metadata.
14 changes: 14 additions & 0 deletions src/ctapipe/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,3 +693,17 @@ def dl1_mon_pointing_file(dl1_file, dl1_tmp_path):
f.remove_node("/configuration/telescope/pointing", recursive=True)

return path


@pytest.fixture
def provenance(monkeypatch):
from ctapipe.core import Provenance

# the singleton nature of Provenance messes with
# the order-independence of the tests asserting
# the provenance contains the correct information
# so we monkeypatch back to an empty state here
prov = Provenance()
monkeypatch.setattr(prov, "_activities", [])
monkeypatch.setattr(prov, "_finished_activities", [])
return prov
266 changes: 142 additions & 124 deletions src/ctapipe/core/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,138 @@ class MissingReferenceMetadata(UserWarning):
"""Warning raised if reference metadata could not be read from input file."""


class _ActivityProvenance:
"""
Low-level helper class to collect provenance information for a given
*activity*. Users should use `Provenance` as a top-level API,
not this class directly.
"""

def __init__(self, activity_name=sys.executable):
self._prov = {
"activity_name": activity_name,
"activity_uuid": str(uuid.uuid4()),
"status": "running",
"start": {},
"stop": {},
"system": {},
"input": [],
"output": [],
"exit_code": None,
}
self.name = activity_name

def start(self):
"""begin recording provenance for this activity. Set's up the system
and startup provenance data. Generally should be called at start of a
program."""
self._prov["start"].update(_sample_cpu_and_memory())
self._prov["system"].update(_get_system_provenance())

def _register(self, what, url, role, add_meta, reference_meta):
if what not in {"input", "output"}:
raise ValueError("what must be 'input' or 'output'")

reference_meta = self._get_reference_meta(
url=url, reference_meta=reference_meta, read_meta=add_meta
)
self._prov[what].append(dict(url=url, role=role, reference_meta=reference_meta))

def register_input(self, url, role=None, add_meta=True, reference_meta=None):
"""
Add a URL of a file to the list of inputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Parameters
----------
url: str
filename or url of input file
role: str
role name that this input satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
self._register(
"input", url, role=role, add_meta=add_meta, reference_meta=reference_meta
)

def register_output(self, url, role=None, add_meta=True, reference_meta=None):
"""
Add a URL of a file to the list of outputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Should only be called once the file is finalized, so that reference metadata
can be read.
Parameters
----------
url: str
filename or url of output file
role: str
role name that this output satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
self._register(
"output", url, role=role, add_meta=add_meta, reference_meta=reference_meta
)

def register_config(self, config):
"""add a dictionary of configuration parameters to this activity"""
self._prov["config"] = config

def finish(self, status="success", exit_code=0):
"""record final provenance information, normally called at shutdown."""
self._prov["stop"].update(_sample_cpu_and_memory())

# record the duration (wall-clock) for this activity
t_start = Time(self._prov["start"]["time_utc"], format="isot")
t_stop = Time(self._prov["stop"]["time_utc"], format="isot")
self._prov["status"] = status
self._prov["exit_code"] = exit_code
self._prov["duration_min"] = (t_stop - t_start).to("min").value

@property
def output(self):
return self._prov.get("output", None)

@property
def input(self):
return self._prov.get("input", None)

def sample_cpu_and_memory(self):
"""
Record a snapshot of current CPU and memory information.
"""
if "samples" not in self._prov:
self._prov["samples"] = []
self._prov["samples"].append(_sample_cpu_and_memory())

@property
def provenance(self):
return self._prov

def _get_reference_meta(
self, url, reference_meta=None, read_meta=True
) -> dict | None:
# here to prevent circular imports / top-level cross-dependencies
from ..io.metadata import read_reference_metadata

if reference_meta is not None or read_meta is False:
return reference_meta

try:
return read_reference_metadata(url).to_dict()
except Exception:
warnings.warn(
f"Could not read reference metadata for input file: {url}",
MissingReferenceMetadata,
)
return None


class Provenance(metaclass=Singleton):
"""
Manage the provenance info for a stack of *activities*
Expand All @@ -129,18 +261,18 @@ def start_activity(self, activity_name=sys.executable):
activity.start()
self._activities.append(activity)
log.debug(f"started activity: {activity_name}")
return activity

def _get_current_or_start_activity(self):
def _get_current_or_start_activity(self) -> _ActivityProvenance:
if self.current_activity is None:
log.info(
"No activity has been explicitly started, starting new default activity."
" Consider calling Provenance().start_activity(<name>) explicitly."
)
self.start_activity()

return self.start_activity()
return self.current_activity

def add_input_file(self, filename, role=None, add_meta=True):
def add_input_file(self, filename, role=None, add_meta=True, reference_meta=None):
"""register an input to the current activity
Parameters
Expand All @@ -151,7 +283,12 @@ def add_input_file(self, filename, role=None, add_meta=True):
role this input file satisfies (optional)
"""
activity = self._get_current_or_start_activity()
activity.register_input(abspath(filename), role=role, add_meta=add_meta)
activity.register_input(
abspath(filename),
role=role,
add_meta=add_meta,
reference_meta=reference_meta,
)
log.debug(
"added input entity '%s' to activity: '%s'",
filename,
Expand Down Expand Up @@ -260,125 +397,6 @@ def clear(self):
self._finished_activities = []


class _ActivityProvenance:
"""
Low-level helper class to collect provenance information for a given
*activity*. Users should use `Provenance` as a top-level API,
not this class directly.
"""

def __init__(self, activity_name=sys.executable):
self._prov = {
"activity_name": activity_name,
"activity_uuid": str(uuid.uuid4()),
"status": "running",
"start": {},
"stop": {},
"system": {},
"input": [],
"output": [],
"exit_code": None,
}
self.name = activity_name

def start(self):
"""begin recording provenance for this activity. Set's up the system
and startup provenance data. Generally should be called at start of a
program."""
self._prov["start"].update(_sample_cpu_and_memory())
self._prov["system"].update(_get_system_provenance())

def register_input(self, url, role=None, add_meta=True):
"""
Add a URL of a file to the list of inputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Parameters
----------
url: str
filename or url of input file
role: str
role name that this input satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
reference_meta = self._get_reference_meta(url=url) if add_meta else None
self._prov["input"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_output(self, url, role=None, add_meta=True):
"""
Add a URL of a file to the list of outputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Should only be called once the file is finalized, so that reference metadata
can be read.
Parameters
----------
url: str
filename or url of output file
role: str
role name that this output satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
reference_meta = self._get_reference_meta(url=url) if add_meta else None
self._prov["output"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_config(self, config):
"""add a dictionary of configuration parameters to this activity"""
self._prov["config"] = config

def finish(self, status="success", exit_code=0):
"""record final provenance information, normally called at shutdown."""
self._prov["stop"].update(_sample_cpu_and_memory())

# record the duration (wall-clock) for this activity
t_start = Time(self._prov["start"]["time_utc"], format="isot")
t_stop = Time(self._prov["stop"]["time_utc"], format="isot")
self._prov["status"] = status
self._prov["exit_code"] = exit_code
self._prov["duration_min"] = (t_stop - t_start).to("min").value

@property
def output(self):
return self._prov.get("output", None)

@property
def input(self):
return self._prov.get("input", None)

def sample_cpu_and_memory(self):
"""
Record a snapshot of current CPU and memory information.
"""
if "samples" not in self._prov:
self._prov["samples"] = []
self._prov["samples"].append(_sample_cpu_and_memory())

@property
def provenance(self):
return self._prov

def _get_reference_meta(self, url):
# here to prevent circular imports / top-level cross-dependencies
from ..io.metadata import read_reference_metadata

try:
return read_reference_metadata(url).to_dict()
except Exception:
warnings.warn(
f"Could not read reference metadata for input file: {url}",
MissingReferenceMetadata,
)


def _get_python_packages():
def _sortkey(dist):
"""Sort packages by name, case insensitive"""
Expand Down
14 changes: 0 additions & 14 deletions src/ctapipe/core/tests/test_provenance.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
import json

import pytest

from ctapipe.core import Provenance
from ctapipe.core.provenance import _ActivityProvenance
from ctapipe.io.metadata import Reference


@pytest.fixture
def provenance(monkeypatch):
# the singleton nature of Provenance messes with
# the order-independence of the tests asserting
# the provenance contains the correct information
# so we monkeypatch back to an empty state here
prov = Provenance()
monkeypatch.setattr(prov, "_activities", [])
monkeypatch.setattr(prov, "_finished_activities", [])
return prov


def test_provenance_activity_names(provenance):
provenance.start_activity("test1")
provenance.add_input_file("input.txt")
Expand Down
Loading

0 comments on commit c1e7163

Please sign in to comment.