Skip to content

Commit

Permalink
Revised pressure jump cell adc to use ophyd-async.
Browse files Browse the repository at this point in the history
  • Loading branch information
barnettwilliam committed Jan 22, 2025
1 parent 36a98be commit 2fb9dd8
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 71 deletions.
17 changes: 9 additions & 8 deletions src/dodal/beamlines/p38.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dodal.devices.i22.fswitch import FSwitch
from dodal.devices.linkam3 import Linkam3
from dodal.devices.pressure_jump_cell import PressureJumpCell
from dodal.devices.pressure_jump_cell_adc import PressureJumpCellADC, PressureJumpAdcConfigParams
from dodal.devices.areadetector import PressureJumpCellDetector
from dodal.devices.slits import Slits
from dodal.devices.tetramm import TetrammDetector
from dodal.devices.undulator import Undulator
Expand Down Expand Up @@ -347,15 +347,16 @@ def high_pressure_xray_cell(
)

def high_pressure_xray_cell_adc(
wait_for_connection: bool = True,
fake_with_ophyd_sim: bool = False,
params: PressureJumpAdcConfigParams | None = None,
) -> PressureJumpCellADC:
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> PressureJumpCellDetector:
return device_instantiation(
PressureJumpCellADC,
PressureJumpCellDetector,
"high_pressure_xray_cell_adc",
"",
"-EA-HPXC-01:",
wait_for_connection,
fake_with_ophyd_sim,
params=params or None,
adc_suffix="TRIG:",
drv_suffix="DET:",
hdf_suffix="FILE:",
path_provider=get_path_provider(),
)
5 changes: 5 additions & 0 deletions src/dodal/devices/areadetector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .pressurejumpcell import PressureJumpCellDetector

__all__ = [
"PressureJumpCellDetector",
]
48 changes: 48 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import get_args

from bluesky.protocols import HasHints, Hints

from ophyd_async.core import PathProvider, StandardDetector
from ophyd_async.epics import adcore

from .pressurejumpcell_controller import PressureJumpCellController
from .pressurejumpcell_io import PressureJumpCellDriverIO, PressureJumpCellAdcIO


class PressureJumpCellDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of a Pressure Jump Cell ADC Detector for fast pressure jumps.
The detector may be configured for an external trigger on the TTL Trig input.
"""

_controller: PressureJumpCellController
_writer: adcore.ADHDFWriter

def __init__(
self,
prefix: str,
path_provider: PathProvider,
drv_suffix="cam1:",
adc_suffix="TRIG",
hdf_suffix="HDF1:",
name="",
):
self.drv = PressureJumpCellDriverIO(prefix + drv_suffix)
self.adc = PressureJumpCellAdcIO(prefix + adc_suffix)
self.hdf = adcore.NDFileHDFIO(prefix + hdf_suffix)

super().__init__(
PressureJumpCellController(self.drv, self.adc),
adcore.ADHDFWriter(
self.hdf,
path_provider,
lambda: self.name,
adcore.ADBaseDatasetDescriber(self.drv),
),
config_sigs=(self.drv.acquire_time,),
name=name,
)

@property
def hints(self) -> Hints:
return self._writer.hints
82 changes: 82 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
from typing import Optional, Tuple

from ophyd_async.core import (
AsyncStatus,
DetectorControl,
DetectorTrigger,
set_and_wait_for_value,
)
from ophyd_async.epics import adcore

from .pressurejumpcell_io import (
PressureJumpCellDriverIO,
PressureJumpCellAdcIO,
PressureJumpCellTriggerMode,
PressureJumpCellAdcTriggerMode,
)


#TODO Find out what the deadtime should be and if it can be retrieved from the device
_HIGHEST_POSSIBLE_DEADTIME = 1e-3


class PressureJumpCellController(DetectorControl):

def __init__(self, driver: PressureJumpCellDriverIO, adc: PressureJumpCellAdcIO) -> None:
self._drv = driver
self._adc = adc

def get_deadtime(self, exposure: float | None) -> float:
return _HIGHEST_POSSIBLE_DEADTIME

async def arm(
self,
num: int = 0,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
if num == 0:
image_mode = adcore.ImageMode.continuous
else:
image_mode = adcore.ImageMode.multiple
if exposure is not None:
await self._drv.acquire_time.set(exposure)

trigger_mode, adc_trigger_mode = self._get_trigger_info(trigger)

# trigger mode must be set first and on it's own!
await self._drv.trigger_mode.set(trigger_mode)
await self._adc.adc_trigger_mode.set(adc_trigger_mode)

await asyncio.gather(
self._drv.num_images.set(num),
self._drv.image_mode.set(image_mode),
)

status = await set_and_wait_for_value(self._drv.acquire, True)
return status

def _get_trigger_info(
self, trigger: DetectorTrigger
) -> Tuple[PressureJumpCellTriggerMode, PressureJumpCellAdcTriggerMode]:
supported_trigger_types = (
DetectorTrigger.edge_trigger,
DetectorTrigger.internal,
)
if trigger not in supported_trigger_types:
raise ValueError(
f"{self.__class__.__name__} only supports the following trigger "
f"types: {supported_trigger_types} but was asked to "
f"use {trigger}"
)
if trigger == DetectorTrigger.internal:
return (PressureJumpCellTriggerMode.internal, PressureJumpCellAdcTriggerMode.continuous)
else:
return (PressureJumpCellTriggerMode.external, PressureJumpCellAdcTriggerMode.single)

async def disarm(self):
await asyncio.gather(
adcore.stop_busy_record(self._drv.acquire, False, timeout=1),
adcore.stop_busy_record(self._adc.acquire, False, timeout=1)
)
29 changes: 29 additions & 0 deletions src/dodal/devices/areadetector/pressurejumpcell_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from enum import Enum

from ophyd_async.core import SubsetEnum
from ophyd_async.epics import adcore
from ophyd_async.epics.signal import epics_signal_rw_rbv


class PressureJumpCellTriggerMode(str, Enum):
internal = "Internal"
external = "Exernal"

class PressureJumpCellAdcTriggerMode(str, Enum):
single = "Single"
multiple = "Multiple"
continuous = "Continuous"

class PressureJumpCellDriverIO(adcore.ADBaseIO):
def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = epics_signal_rw_rbv(
PressureJumpCellTriggerMode, prefix + "TriggerMode"
)
super().__init__(prefix, name=name)

class PressureJumpCellAdcIO(adcore.ADBaseIO):
def __init__(self, prefix: str, name: str = "") -> None:
self.adc_trigger_mode = epics_signal_rw_rbv(
PressureJumpCellAdcTriggerMode, prefix + "TriggerMode"
)
super().__init__(prefix, name=name)
63 changes: 0 additions & 63 deletions src/dodal/devices/pressure_jump_cell_adc.py

This file was deleted.

0 comments on commit 2fb9dd8

Please sign in to comment.