Skip to content

Commit

Permalink
Machine.block
Browse files Browse the repository at this point in the history
  • Loading branch information
cgevans committed Mar 7, 2024
1 parent b9e339f commit 89776e5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ SPDX-License-Identifier: EUPL-1.2

# Changelog

## Development

- `Machine.block` command to directly control block temperatures.

## Version 0.11.0

- Initial support for 384-well blocks, at least in data/file reading.
Expand Down
86 changes: 48 additions & 38 deletions src/qslib/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ def run_command_to_ack(self, command: str | SCPICommand) -> str:
raise ConnectionError(f"Not connected to {self.host}")
loop = asyncio.get_event_loop()
try:
return loop.run_until_complete(
self.connection.run_command(command, just_ack=True)
)
return loop.run_until_complete(self.connection.run_command(command, just_ack=True))
except CommandError as e:
e.__traceback__ = None
raise e
Expand Down Expand Up @@ -351,15 +349,11 @@ def list_files(
) -> list[str] | list[dict[str, Any]]:
loop = asyncio.get_event_loop()
return loop.run_until_complete(
self.connection.list_files(
path, leaf=leaf, verbose=verbose, recursive=recursive
)
self.connection.list_files(path, leaf=leaf, verbose=verbose, recursive=recursive)
)

@_ensure_connection(AccessLevel.Observer)
def read_file(
self, path: str, context: str | None = None, leaf: str = "FILE"
) -> bytes:
def read_file(self, path: str, context: str | None = None, leaf: str = "FILE") -> bytes:
"""Read a file.
Parameters
Expand All @@ -375,21 +369,15 @@ def read_file(
bytes
returned file
"""
return asyncio.get_event_loop().run_until_complete(
self.connection.read_file(path, context, leaf)
)
return asyncio.get_event_loop().run_until_complete(self.connection.read_file(path, context, leaf))

@_ensure_connection(AccessLevel.Controller)
def write_file(self, path: str, data: str | bytes) -> None:
if isinstance(data, str):
data = data.encode()

self.run_command_bytes(
b"FILE:WRITE "
+ path.encode()
+ b" <quote.base64>\n"
+ base64.encodebytes(data)
+ b"\n</quote.base64>"
b"FILE:WRITE " + path.encode() + b" <quote.base64>\n" + base64.encodebytes(data) + b"\n</quote.base64>"
)

@_ensure_connection(AccessLevel.Observer)
Expand All @@ -405,9 +393,7 @@ def list_runs_in_storage(self) -> list[str]:
"""
x = self.run_command("FILE:LIST? public_run_complete:")
a = x.split("\n")[1:-1]
return [
re.sub("^public_run_complete:", "", s)[:-4] for s in a if s.endswith(".eds")
]
return [re.sub("^public_run_complete:", "", s)[:-4] for s in a if s.endswith(".eds")]

@_ensure_connection(AccessLevel.Observer)
def load_run_from_storage(self, path: str) -> "Experiment": # type: ignore
Expand All @@ -418,9 +404,7 @@ def load_run_from_storage(self, path: str) -> "Experiment": # type: ignore
return Experiment.from_machine_storage(self, path)

@_ensure_connection(AccessLevel.Guest)
def save_run_from_storage(
self, machine_path: str, download_path: str | IO[bytes], overwrite: bool = False
) -> None:
def save_run_from_storage(self, machine_path: str, download_path: str | IO[bytes], overwrite: bool = False) -> None:
"""Download a file from run storage on the machine.
Parameters
Expand Down Expand Up @@ -451,9 +435,7 @@ def save_run_from_storage(

@_ensure_connection(AccessLevel.Observer)
def _get_log_from_byte(self, name: str | bytes, byte: int) -> bytes:
logfuture: Future[
tuple[bytes, bytes, Future[tuple[bytes, bytes, None]] | None]
] = asyncio.Future()
logfuture: Future[tuple[bytes, bytes, Future[tuple[bytes, bytes, None]] | None]] = asyncio.Future()
if self.connection is None:
raise Exception
if isinstance(name, bytes):
Expand Down Expand Up @@ -489,9 +471,7 @@ def machine_status(self) -> MachineStatus:
@_ensure_connection(AccessLevel.Observer)
def get_running_protocol(self) -> Protocol:
p = _unwrap_tags(self.run_command("PROT? ${Protocol}"))
pn, svs, rm = self.run_command(
"RET ${Protocol} ${SampleVolume} ${RunMode}"
).split()
pn, svs, rm = self.run_command("RET ${Protocol} ${SampleVolume} ${RunMode}").split()
p = f"PROT -volume={svs} -runmode={rm} {pn} " + p
return Protocol.from_scpicommand(SCPICommand.from_string(p))

Expand All @@ -509,9 +489,7 @@ def set_access_level(
" Change max_access level to continue."
)

self.run_command(
f"ACC -stealth={stealth} -exclusive={exclusive} {access_level}"
)
self.run_command(f"ACC -stealth={stealth} -exclusive={exclusive} {access_level}")
log.debug(f"Took access level {access_level} {exclusive=} {stealth=}")
self._current_access_level = access_level

Expand Down Expand Up @@ -560,6 +538,42 @@ def drawer_close(self, lower_cover: bool = True, check: bool = True) -> None:
if lower_cover:
self.cover_lower(check=check, ensure_drawer=False)

@property
@_ensure_connection(AccessLevel.Observer)
def block(self) -> tuple[bool, float]:
"""Returns whether the block is currently temperature-controlled, and the current block temperature setting."""
sbool, v = self.run_command("BLOCK?").split()
sbool = sbool.lower()
v = float(v)

if sbool == "on":
return True, v
elif sbool == "off":
return False, v
else:
raise ValueError(f"Block status {sbool} {v} is not understood.")

@block.setter
@_ensure_connection(AccessLevel.Controller)
def block(self, value: float | None | False | True | tuple[bool, float]):
"""Set the block temperature control.
If a float is given, it will be set to that temperature; None or False will
turn off the block temperature control, and True will turn it on at the current set temperature. A tuple can be given
to specify both the on/off status and the temperature."""
if (value is None) or (value is False):
bcom = "OFF"
elif value is True:
bcom = "ON"
elif isinstance(value, tuple):
bcom = f"{'ON' if value[0] else 'OFF'} {float(value[1])}"
else:
try:
bcom = f"ON {float(value)}"
except ValueError:
raise ValueError(f"Block value {value} is not understood.")
self.run_command(f"BLOCK {bcom}")

@property
def status(self) -> RunStatus:
"""Return the current status of the run."""
Expand Down Expand Up @@ -693,14 +707,10 @@ def at_access(
log.debug(f"Took access level {access_level} {exclusive=} {stealth=}.")
yield self
self.set_access_level(fac, fex, fst)
log.debug(
f"Dropped access level {access_level}, returning to {fac} exclusive={fex} stealth={fst}."
)
log.debug(f"Dropped access level {access_level}, returning to {fac} exclusive={fex} stealth={fst}.")

@contextmanager
def ensured_connection(
self, access_level: AccessLevel = AccessLevel.Observer
) -> Generator[Machine, None, None]:
def ensured_connection(self, access_level: AccessLevel = AccessLevel.Observer) -> Generator[Machine, None, None]:
if self.automatic:
was_connected = self.connected
if not was_connected:
Expand Down

0 comments on commit 89776e5

Please sign in to comment.