Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BTHofmann2023 additions #305

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
3 changes: 2 additions & 1 deletion sed/binning/binning.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ def bin_dataframe(
xarray object, combining the data with the axes (bin centers).
"""
bins, axes, ranges = simplify_binning_arguments(bins, axes, ranges)

# filter dataframe to use only the columns needed for the binning
df = df[axes]
Comment on lines +296 to +297
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this before in your commits. I don't think this is helpful, as it introduces another graph layer. Did you try whether this really improve computation time? In my tests, it slowed things down. Or why did you introduce this in the first place?

# create the coordinate axes for the xarray output
# if provided as array, they are interpreted as bin centers
if isinstance(bins[0], np.ndarray):
Expand Down
64 changes: 54 additions & 10 deletions sed/config/flash_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,19 @@ dataframe:
# slice: if the group contains multidim data, where to slice

channels:
# pulse ID is a necessary channel for using the loader.
pulseId:
timeStamp:
format: per_train
group_name: "/uncategorised/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.1/"
pulseId: # pulse ID is a necessary channel for using the loader.
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
slice: 2

dldPosX:
# DLD channels
dldPosX: # x position on the DLD detector
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
slice: 1

dldPosY:
dldPosY: # x position on the DLD detector
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
slice: 0
Expand All @@ -108,10 +109,8 @@ dataframe:
format: per_electron
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
slice: 3

# The auxillary channel has a special structure where the group further contains
# a multidim structure so further aliases are defined below
dldAux:
dldAux: # The auxillary channel has a special structure where the group further contains
# a multidim structure so further aliases are defined below
format: per_pulse
group_name: "/uncategorised/FLASH.EXP/HEXTOF.DAQ/DLD1/"
slice: 4
Expand All @@ -123,6 +122,49 @@ dataframe:
cryoTemperature: 4
sampleTemperature: 5
dldTimeBinSize: 15
# FEL channels
gmdBda: # in uJ per pulse
format: per_pulse
group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/"
slice: 0
gmdPosh: # verrical position of the FEL
format: per_pulse
group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/"
slice: 2
gmdPosv: # horizontal position of the FEL
format: per_pulse
group_name: "/FL1/Photon Diagnostic/GMD/Average energy/energy BDA/"
slice: 3
bam: # Here we use the DBC2 BAM as the "normal" one is broken.
format: per_pulse
group_name: "/uncategorised/FLASH.SDIAG/BAM.DAQ/FL0.DBC2.ARRIVAL_TIME.ABSOLUTE.SA1.COMP/"
monochromatorPhotonEnergy: # single value. to be changed
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/monochromator photon energy/"
monoDelta1:
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/ADC.PGM2/"
slice: 0
monoDelta2:
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/ADC.PGM2/"
slice: 1
monoMirrorAngle:
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/ADC.PGM2/"
slice: 2
monoGratingAngle:
format: per_train
group_name: "/FL1/Beamlines/PG/Monochromator/ADC.PGM2/"
slice: 3

# Optical laser channels
delayStage:
format: per_train
group_name: "/zraw/FLASH.SYNC/LASER.LOCK.EXP/F1.PG.OSC/FMC0.MD22.1.ENCODER_POSITION.RD/dGroup/"
opticalDiode:
format: per_pulse
group_name: "/zraw/FLASH.LASER/FLACPUPGLASER1.PULSEENERGY/PG2_incoupl/dGroup/"

# The prefixes of the stream names for different DAQ systems for parsing filenames
# (Not to be changed by user)
Expand All @@ -139,6 +181,8 @@ dataframe:
# (Not to be changed by user)
beamtime_dir:
pg2: "/asap3/flash/gpfs/pg2/"
hextof: "/asap3/fs-flash-o/gpfs/hextof/"
wespe: "/asap3/fs-flash-o/gpfs/wespe/"

# metadata collection from scicat
# metadata:
Expand Down
111 changes: 94 additions & 17 deletions sed/core/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas as pd
import psutil
import xarray as xr
from dask.diagnostics import ProgressBar

from sed.binning import bin_dataframe
from sed.binning.binning import normalization_histogram_from_timed_dataframe
Expand Down Expand Up @@ -164,14 +165,81 @@ def __init__(
)

def __repr__(self):
info = self.get_run_info()
if self._dataframe is None:
df_str = "Data Frame: No Data loaded"
else:
df_str = self._dataframe.__repr__()
attributes_str = f"Metadata: {self._attributes.metadata}"
pretty_str = df_str + "\n" + attributes_str
df_str = f"Data Frame: {len(info['dataframe']['columns'])} columns.\n"
df_str += f"{' '*11} {info['dataframe']['num_electrons']:,.0f} electrons.\n"
df_str += f"{' '*11} {info['dataframe']['num_trains']:,.0f} trains.\n"
df_str += f"{' '*11} {info['dataframe']['electrons_per_train']:,.1f} electrons/train.\n"
if "num_pulses" in info["dataframe"]:
df_str += f"{' '*11} {info['dataframe']['num_pulses']:,.0f} pulses.\n"
df_str += (
f"{' '*11} {info['dataframe']['electrons_per_pulse']} " "electrons/pulse.\n"
)
df_str += f"{' '*11} {info['dataframe']['timestamp_duration']:,.0f} seconds.\n"
df_str += f"{' '*11} {info['dataframe']['duration']}.\n"
df_str += (
f"{' '*11} {info['dataframe']['start_time']} to {info['dataframe']['end_time']}.\n"
)

# df_str = self._dataframe.__repr__()
# attributes_str = f"Metadata: {self._attributes.metadata}"
pretty_str = df_str # + "\n" + attributes_str
return pretty_str

def get_run_info(self, compute=False) -> dict:
"""Function to return a dict of information about the loaded data.

TODO: add dtypes from dataframe. add columns per pulse/per electron/per train

Returns:
dict: Dictionary with information about the loaded data.
"""
info: Dict[str, Any] = {}
head = self.dataframe.head(1)
tail = self.dataframe.tail(1)
info["dataframe"] = {}
info["dataframe"]["columns"] = self.dataframe.columns
if hasattr(self.loader, "num_electrons"):
n_el: int = self.loader.num_electrons
else:
n_el = None
if n_el is None and compute:
with ProgressBar():
print("computing number of electrons")
n_el = len(self.dataframe)
info["dataframe"]["num_electrons"] = n_el
if hasattr(self.loader, "num_pulses"):
n_pulses: int = self.loader.num_pulses
else:
n_pulses = None
if n_pulses is None and compute:
with ProgressBar():
print("computing number of pulses")
n_pulses = len(self.dataframe[self.dataframe["electronId"] == 0])
train_range: tuple = int(head["trainId"]), int(tail["trainId"])
n_trains = train_range[1] - train_range[0]
info["dataframe"]["trainId_min"] = train_range[0]
info["dataframe"]["trainId_max"] = train_range[1]
info["dataframe"]["num_trains"] = n_trains
if n_pulses is not None:
info["dataframe"]["electrons_per_pulse"] = n_el / n_pulses
if n_trains is not None:
info["dataframe"]["electrons_per_train"] = n_el / n_trains
tsr = float(head["timeStamp"]), float(tail["timeStamp"])
info["dataframe"]["timestamp_min"] = tsr[0]
info["dataframe"]["timestamp_max"] = tsr[1]
info["dataframe"]["timestamp_duration"] = tsr[1] - tsr[0]
info["dataframe"]["start_time"] = pd.to_datetime(tsr[0], unit="s")
info["dataframe"]["end_time"] = pd.to_datetime(tsr[1], unit="s")
info["dataframe"]["duration"] = pd.to_timedelta(tsr[1] - tsr[0], unit="s")

info["metadata"] = self._attributes.metadata
info["config"] = self._config
return info

@property
def dataframe(self) -> Union[pd.DataFrame, ddf.DataFrame]:
"""Accessor to the underlying dataframe.
Expand Down Expand Up @@ -1949,20 +2017,29 @@ def compute(
"Only 'col', 'lower_bound' and 'upper_bound' allowed as filter entries. ",
f"Parameters {invalid_keys} not valid in {param}.",
) from exc

self._binned = bin_dataframe(
df=dataframe,
bins=bins,
axes=axes,
ranges=ranges,
hist_mode=hist_mode,
mode=mode,
pbar=pbar,
n_cores=num_cores,
threads_per_worker=threads_per_worker,
threadpool_api=threadpool_api,
**kwds,
)

try:
self._binned = bin_dataframe(
df=dataframe,
bins=bins,
axes=axes,
ranges=ranges,
hist_mode=hist_mode,
mode=mode,
pbar=pbar,
n_cores=num_cores,
threads_per_worker=threads_per_worker,
threadpool_api=threadpool_api,
**kwds,
)
except Exception as ex:
if type(ex).__name__ == "TypingError":
raise TypeError(
"Numba TypingError during binning. One of the axes probably has invalid types."
" Could one of the axes be all nans?",
) from ex
else:
raise ex

for dim in self._binned.dims:
try:
Expand Down
Loading
Loading