diff --git a/CHANGELOG.md b/CHANGELOG.md index e0cfa17..38ae37e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ SPDX-License-Identifier: EUPL-1.2 - `Machine.block` command to directly control block temperatures. - Certificate support for SSL, and new connection recommendations. - Stage lines and event spans now are now not plotted if they are outside of the selected time range / stages. +- `Step` now supports repeats. +- `Stage.stepped_ramp` can now take multiple data points per temperature. +- Fix hanging connection bug. ## Version 0.11.0 diff --git a/src/qslib/protocol.py b/src/qslib/protocol.py index f731a57..ed59cf7 100644 --- a/src/qslib/protocol.py +++ b/src/qslib/protocol.py @@ -50,7 +50,9 @@ NZONES = 6 -UR: pint.UnitRegistry = pint.UnitRegistry(autoconvert_offset_to_baseunit=True, auto_reduce_dimensions=True) +UR: pint.UnitRegistry = pint.UnitRegistry( + autoconvert_offset_to_baseunit=True, auto_reduce_dimensions=True +) Q_ = UR.Quantity @@ -77,7 +79,10 @@ def _wrap_seconds(val: int | float | str | pint.Quantity) -> pint.Quantity: uv = Q_(val, _SECONDS) return uv -def _maybe_wrap_seconds(val: int | float | str | pint.Quantity | None) -> pint.Quantity | None: + +def _maybe_wrap_seconds( + val: int | float | str | pint.Quantity | None, +) -> pint.Quantity | None: if val is None: return None elif isinstance(val, str): @@ -147,7 +152,9 @@ def _wrap_degC_or_none( def _wrapunitmaybelist_degC( - val: (int | float | str | pint.Quantity | Sequence[int | float | str | pint.Quantity]), + val: ( + int | float | str | pint.Quantity | Sequence[int | float | str | pint.Quantity] + ), ) -> pint.Quantity: unit: pint.Unit = UR.Unit("degC") @@ -260,12 +267,15 @@ def to_scpicommand(self, **kwargs: None) -> SCPICommand: if self.cover is not None: opts["cover"] = self.cover.to("degC").magnitude - return SCPICommand("RAMP", *self.temperature.to("degC").magnitude, comment=None, **opts) + return SCPICommand( + "RAMP", *self.temperature.to("degC").magnitude, comment=None, **opts + ) @classmethod def from_scpicommand(cls, sc: SCPICommand) -> Ramp: return Ramp(Q_(sc.args, "degC"), **sc.opts) # type: ignore + @dataclass class Exposure(ProtoCommand): """Modifies exposure settings.""" @@ -292,6 +302,7 @@ def from_scpicommand(cls, sc: SCPICommand) -> Exposure: ] return Exposure(filts, **sc.opts) # type: ignore + def _filtersequence(x: Sequence[str | FilterSet]) -> Sequence[FilterSet]: return [FilterSet.fromstring(f) for f in x] @@ -307,7 +318,9 @@ class HACFILT(ProtoCommand): _default_filters: Sequence[FilterSet] = attr.field(factory=lambda: []) _names: ClassVar[Sequence[str]] = ("HoldAndCollectFILTer", "HACFILT") - def to_scpicommand(self, default_filters: Sequence[FilterSet] | None = None, **kwargs: None) -> SCPICommand: + def to_scpicommand( + self, default_filters: Sequence[FilterSet] | None = None, **kwargs: None + ) -> SCPICommand: if default_filters is None: default_filters = [] if not default_filters and not self.filters: @@ -349,7 +362,9 @@ def _maybe_quantity_to_seconds_int(q: pint.Quantity | int | None) -> int | None: class HoldAndCollect(ProtoCommand): """A protocol hold (for a time) and collect (set by HACFILT) command.""" - time: pint.Quantity = attr.field(converter=_wrap_seconds, on_setattr=attr.setters.convert) + time: pint.Quantity = attr.field( + converter=_wrap_seconds, on_setattr=attr.setters.convert + ) increment: pint.Quantity = attr.field( default=_ZERO_SECONDS, converter=_wrap_seconds, on_setattr=attr.setters.convert ) @@ -387,8 +402,12 @@ def from_scpicommand(cls, sc: SCPICommand) -> HoldAndCollect: class Hold(ProtoCommand): """A protocol hold (for a time) command.""" - time: pint.Quantity | None = attr.field(converter=_maybe_wrap_seconds, on_setattr=attr.setters.convert) - increment: pint.Quantity = attr.field(converter=_wrap_seconds, on_setattr=attr.setters.convert, default=_ZERO_SECONDS) + time: pint.Quantity | None = attr.field( + converter=_maybe_wrap_seconds, on_setattr=attr.setters.convert + ) + increment: pint.Quantity = attr.field( + converter=_wrap_seconds, on_setattr=attr.setters.convert, default=_ZERO_SECONDS + ) incrementcycle: int = 1 incrementstep: int = 1 _names: ClassVar[Sequence[str]] = ("HOLD",) @@ -415,7 +434,8 @@ def from_scpicommand(cls, sc: SCPICommand) -> Hold: class XMLable(ABC): @abstractmethod - def to_xml(self, **kwargs: Any) -> ET.Element: ... + def to_xml(self, **kwargs: Any) -> ET.Element: + ... G = TypeVar("G") @@ -428,10 +448,12 @@ def __init__(self, val_list: list[G]): self._list = val_list @overload - def _translate_key(self, key: int | str) -> int: ... + def _translate_key(self, key: int | str) -> int: + ... @overload - def _translate_key(self, key: slice) -> slice: ... + def _translate_key(self, key: slice) -> slice: + ... def _translate_key(self, key: int | str | slice) -> int | slice: if isinstance(key, int): @@ -445,10 +467,12 @@ def __getitem__(self, key: int | str | slice) -> G | list[G]: return self._list[self._translate_key(key)] @overload - def __setitem__(self, key: int | str, val: G) -> None: ... + def __setitem__(self, key: int | str, val: G) -> None: + ... @overload - def __setitem__(self, key: slice, val: Sequence[G]) -> None: ... + def __setitem__(self, key: slice, val: Sequence[G]) -> None: + ... def __setitem__(self, key, val): self._list.__setitem__(self._translate_key(key), val) @@ -490,7 +514,10 @@ def info_str(self, index: None | int = None, repeats: int = 1) -> str: else: s = "- " s += f"Step{' '+str(self._identifier) if self._identifier is not None else ''} of commands:\n" - s += "\n".join(f" {i+1}. " + c.to_scpicommand().to_string() for i, c in enumerate(self._body)) + s += "\n".join( + f" {i+1}. " + c.to_scpicommand().to_string() + for i, c in enumerate(self._body) + ) return s def duration_at_cycle_point(self, cycle: int) -> pint.Quantity[int]: # cycle from 1 @@ -525,7 +552,9 @@ def collects(self) -> bool: def to_xml(self, **kwargs: Any) -> ET.Element: assert not kwargs e = ET.Element("TCStep") - ET.SubElement(e, "CollectionFlag").text = str(int(self.collects)) # FIXME: approx + ET.SubElement(e, "CollectionFlag").text = str( + int(self.collects) + ) # FIXME: approx for t in range(0, 6): # FIXME ET.SubElement(e, "Temperature").text = "30.0" ET.SubElement(e, "HoldTime").text = "1" @@ -558,7 +587,10 @@ def from_scpicommand(cls, sc: SCPICommand) -> CustomStep: return Step.from_scpicommand(sc) except ValueError: return cls( - [cast(ProtoCommand, x.specialize()) for x in cast(Sequence[SCPICommand], sc.args[1])], + [ + cast(ProtoCommand, x.specialize()) + for x in cast(Sequence[SCPICommand], sc.args[1]) + ], identifier=cast(Union[int, str], sc.args[0]), **sc.opts, # type: ignore ) @@ -603,8 +635,12 @@ class Step(CustomStep, XMLable): This currently does not support step-level repeats, which do exist on the machine. """ - time: pint.Quantity = attr.field(converter=_wrap_seconds, on_setattr=attr.setters.convert) - temperature: pint.Quantity = attr.field(converter=_wrapunitmaybelist_degC, on_setattr=attr.setters.convert) + time: pint.Quantity = attr.field( + converter=_wrap_seconds, on_setattr=attr.setters.convert + ) + temperature: pint.Quantity = attr.field( + converter=_wrapunitmaybelist_degC, on_setattr=attr.setters.convert + ) collect: bool | None = None temp_increment: pint.Quantity[float] = attr.field( default=_ZEROTEMPDELTA, @@ -708,26 +744,43 @@ def info_str(self, index: None | int = None, repeats: int = 1) -> str: if not np.all(temps_c1p1 == temps_cep1) or not np.all(temps_cep1 == temps_cepe): if not np.all(temps_cep1 == temps_cepe): - tempstr += f" to ({_temp_format(temps_cep1)} to {_temp_format(temps_cepe)})" + tempstr += ( + f" to ({_temp_format(temps_cep1)} to {_temp_format(temps_cepe)})" + ) else: tempstr += f" to {_temp_format(temps_cep1)}" time_c1p1 = self.duration_at_cycle_point(1, 1) - time_c1pe = self.duration_at_cycle_point(1, self.repeat if self.repeat is not None else 1) + time_c1pe = self.duration_at_cycle_point( + 1, self.repeat if self.repeat is not None else 1 + ) time_cep1 = self.duration_at_cycle_point(repeats, 1) - time_cepe = self.duration_at_cycle_point(repeats, self.repeat if self.repeat is not None else 1) + time_cepe = self.duration_at_cycle_point( + repeats, self.repeat if self.repeat is not None else 1 + ) if self.repeat > 1: if np.all(time_c1p1 == time_c1pe) and np.all(time_c1p1 == time_cep1): - tempstr += f" for {self.repeat} points at {_durformat(time_c1p1)}/point ({_durformat(time_c1p1*self.repeat)}/cycle)" + tempstr += ( + f" for {self.repeat} points at {_durformat(time_c1p1)}/point" + f" ({_durformat(time_c1p1*self.repeat)}/cycle)" + ) elif np.all(time_c1p1 == time_c1pe): - tempstr += f" for {self.repeat} points at {_durformat(time_c1p1)}/point to {_durformat(time_cep1)}/point ({_durformat(time_c1p1*self.repeat)}/cycle to {_durformat(time_cep1*self.repeat)}/cycle)" + tempstr += ( + f" for {self.repeat} points at {_durformat(time_c1p1)}/point to {_durformat(time_cep1)}/point" + f" ({_durformat(time_c1p1*self.repeat)}/cycle to {_durformat(time_cep1*self.repeat)}/cycle)" + ) elif np.all(time_c1p1 == time_cep1): - tempstr += f" for {self.repeat} points at ({_durformat(time_c1p1)}/point to {_durformat(time_cep1)}/point) ({_durformat(self.duration_of_cycle(1))}/cycle)" + tempstr += ( + f" for {self.repeat} points at ({_durformat(time_c1p1)}/point to {_durformat(time_cep1)}/point) " + f"({_durformat(self.duration_of_cycle(1))}/cycle)" + ) else: tempstr += ( f" for {self.repeat} points at ({_durformat(time_c1p1)}/point to {_durformat(time_c1pe)}/point) to " - f"({_durformat(time_cep1)}/point to {_durformat(time_cepe)}/point) ({_durformat(self.duration_of_cycle(1))}/cycle to {self.duration_of_cycle(repeats)}/cycle)" + f"({_durformat(time_cep1)}/point to {_durformat(time_cepe)}/point)" + f" ({_durformat(self.duration_of_cycle(1))}/cycle to " + f"{_durformat(self.duration_of_cycle(repeats))}/cycle)" ) else: @@ -736,7 +789,7 @@ def info_str(self, index: None | int = None, repeats: int = 1) -> str: else: tempstr += f" for {_durformat(time_c1p1)}/cycle to {_durformat(time_cep1)}/cycle" - elems = [f"{tempstr} for {self.time:~}/cycle"] + elems = [tempstr] if self.temp_increment != 0.0: if (self.repeat > 1) and (self._machine_temp_incrementpoint < self.repeat): elems.append(f"{self.temp_increment:+~}/point") @@ -776,7 +829,9 @@ def info_str(self, index: None | int = None, repeats: int = 1) -> str: return s def total_duration(self, repeats: int = 1) -> pint.Quantity: - return sum((self.duration_of_cycle(c) for c in range(1, repeats + 1)), 0 * UR.seconds) + return sum( + (self.duration_of_cycle(c) for c in range(1, repeats + 1)), 0 * UR.seconds + ) def duration_at_cycle_point(self, cycle: int, point: int = 1) -> pint.Quantity: "Durations of the step at `cycle` (from 1)" @@ -789,9 +844,14 @@ def duration_of_cycle(self, cycle: int) -> pint.Quantity: def durations_at_cycle(self, cycle: int) -> list[pint.Quantity]: # cycle from 1 "Duration of the step (excluding ramp) at `cycle` (from 1)" - return [self.duration_at_cycle_point(cycle, point) for point in range(1, self.repeat + 1)] + return [ + self.duration_at_cycle_point(cycle, point) + for point in range(1, self.repeat + 1) + ] - def temperatures_at_cycle_point(self, cycle: int, point: int) -> pint.Quantity[np.ndarray]: + def temperatures_at_cycle_point( + self, cycle: int, point: int + ) -> pint.Quantity[np.ndarray]: "Temperatures of the step at `cycle` (from 1)" inccycles = max(0, cycle + 1 - self.temp_incrementcycle) incpoints = max(0, point + 1 - self._machine_temp_incrementpoint) @@ -799,7 +859,10 @@ def temperatures_at_cycle_point(self, cycle: int, point: int) -> pint.Quantity[n def temperatures_at_cycle(self, cycle: int) -> list[pint.Quantity[np.ndarray]]: "Temperatures of the step at `cycle` (from 1)" - return [self.temperatures_at_cycle_point(cycle, point) for point in range(1, self.repeat + 1)] + return [ + self.temperatures_at_cycle_point(cycle, point) + for point in range(1, self.repeat + 1) + ] @property def identifier(self) -> int | str | None: @@ -859,11 +922,17 @@ def body(self, v: Any) -> None: raise ValueError @classmethod - def from_xml(cls, e: ET.Element, *, etc: int = 1, ehtc: int = 1, he: bool = False) -> Step: + def from_xml( + cls, e: ET.Element, *, etc: int = 1, ehtc: int = 1, he: bool = False + ) -> Step: collect = bool(int(e.findtext("CollectionFlag") or 0)) - ts: pint.Quantity[np.ndarray] = Q_([float(x.text or math.nan) for x in e.findall("Temperature")], "degC") + ts: pint.Quantity[np.ndarray] = Q_( + [float(x.text or math.nan) for x in e.findall("Temperature")], "degC" + ) ht: pint.Quantity[int] = int(e.findtext("HoldTime") or 0) * UR.seconds - et: pint.Quantity[float] = float(e.findtext("ExtTemperature") or 0.0) * UR.delta_degC + et: pint.Quantity[float] = ( + float(e.findtext("ExtTemperature") or 0.0) * UR.delta_degC + ) eht: pint.Quantity[int] = int(e.findtext("ExtHoldTime") or 0) * UR.seconds if not he: et = _ZEROTEMPDELTA @@ -873,13 +942,19 @@ def from_xml(cls, e: ET.Element, *, etc: int = 1, ehtc: int = 1, he: bool = Fals def to_xml(self, **kwargs: Any) -> ET.Element: assert not kwargs e = ET.Element("TCStep") - ET.SubElement(e, "CollectionFlag").text = str(int(self.collects)) # FIXME: approx + ET.SubElement(e, "CollectionFlag").text = str( + int(self.collects) + ) # FIXME: approx for t in self.temperature_list.to("°C").magnitude: ET.SubElement(e, "Temperature").text = str(t) ET.SubElement(e, "HoldTime").text = str(int(self.time.to("seconds").magnitude)) # FIXME: does not contain cycle starts, because AB format can't handle - ET.SubElement(e, "ExtTemperature").text = str(self.temp_increment.to("delta_degC").magnitude) - ET.SubElement(e, "ExtHoldTime").text = str(int(self.time_increment.to("seconds").magnitude)) + ET.SubElement(e, "ExtTemperature").text = str( + self.temp_increment.to("delta_degC").magnitude + ) + ET.SubElement(e, "ExtHoldTime").text = str( + int(self.time_increment.to("seconds").magnitude) + ) # FIXME: RampRate, RampRateUnit ET.SubElement(e, "RampRate").text = "1.6" ET.SubElement(e, "RampRateUnit").text = "DEGREES_PER_SECOND" @@ -900,7 +975,13 @@ def from_scpicommand(cls, sc: SCPICommand) -> Step: r = cast(Ramp, coms[0]) hcf = cast(HACFILT, coms[1]) h = cast(HoldAndCollect, coms[2]) - c = cls(h.time, r.temperature, time_incrementcycle=1, temp_incrementcycle=1, repeat=repeat) + c = cls( + h.time, + r.temperature, + time_incrementcycle=1, + temp_incrementcycle=1, + repeat=repeat, + ) c.collect = True if hcf._default_filters: c.filters = [] @@ -911,23 +992,37 @@ def from_scpicommand(cls, sc: SCPICommand) -> Step: c.time_increment = h.increment c.temp_increment = r.increment c.time_incrementcycle = h.incrementcycle - c.time_incrementpoint = h.incrementstep if h.incrementstep <= repeat else None + c.time_incrementpoint = ( + h.incrementstep if h.incrementstep <= repeat else None + ) c.temp_incrementcycle = r.incrementcycle - c.temp_incrementpoint = r.incrementstep if r.incrementstep <= repeat else None + c.temp_incrementpoint = ( + r.incrementstep if r.incrementstep <= repeat else None + ) elif com_classes == [Ramp, Hold]: r = cast(Ramp, coms[0]) h = cast(Hold, coms[1]) if h.time is None: raise ValueError - c = cls(h.time, r.temperature, time_incrementcycle=1, temp_incrementcycle=1, repeat=repeat) + c = cls( + h.time, + r.temperature, + time_incrementcycle=1, + temp_incrementcycle=1, + repeat=repeat, + ) c.collect = False c.time_increment = h.increment c.temp_increment = r.increment c.time_incrementcycle = h.incrementcycle - c.time_incrementpoint = h.incrementstep if h.incrementstep <= repeat else None + c.time_incrementpoint = ( + h.incrementstep if h.incrementstep <= repeat else None + ) c.temp_incrementcycle = r.incrementcycle - c.temp_incrementpoint = r.incrementstep if r.incrementstep <= repeat else None + c.temp_incrementpoint = ( + r.incrementstep if r.incrementstep <= repeat else None + ) else: raise ValueError return c @@ -976,9 +1071,17 @@ def __eq__(self, other: object) -> bool: return False if self.__class__ != other.__class__: return False - if (self.index is not None) and (other.index is not None) and self.index != other.index: + if ( + (self.index is not None) + and (other.index is not None) + and self.index != other.index + ): return False - if (self.label is not None) and (other.label is not None) and self.label != other.label: + if ( + (self.label is not None) + and (other.label is not None) + and self.label != other.label + ): return False return self.steps == other.steps @@ -1066,13 +1169,20 @@ def stepped_ramp( autoset_step = False n_steps = max( - abs(round((max_delta / temperature_step).to("").magnitude)) + (0 if start_increment else 1), + abs(round((max_delta / temperature_step).to("").magnitude)) + + (0 if start_increment else 1), 1, ) - real_max_temperature_step = abs(max_delta / (n_steps - (0 if start_increment else 1))) + real_max_temperature_step = abs( + max_delta / (n_steps - (0 if start_increment else 1)) + ) - change = ((real_max_temperature_step - temperature_step) / temperature_step).to("").magnitude + change = ( + ((real_max_temperature_step - temperature_step) / temperature_step) + .to("") + .magnitude + ) if (abs(change) > 0.05) and not autoset_step: warnings.warn( @@ -1082,10 +1192,19 @@ def stepped_ramp( elif temperature_step is not None: temperature_step = abs(_wrap_delta_degC(temperature_step)) - if abs(round((max_delta / temperature_step).to("").magnitude)) + (0 if start_increment else 1) != n_steps: - raise ValueError("Both n_steps and temperature_step set, and calculated steps don't match set steps.") + if ( + abs(round((max_delta / temperature_step).to("").magnitude)) + + (0 if start_increment else 1) + != n_steps + ): + raise ValueError( + "Both n_steps and temperature_step set, and calculated steps don't match set steps." + ) - temp_increment = ((to_temperature - from_temperature) / (n_steps - (0 if start_increment else 1))).round(4) + temp_increment = ( + (to_temperature - from_temperature) + / (n_steps - (0 if start_increment else 1)) + ).round(4) # If the temp_increment is entirely equal, we are not multistep, and we should # have only a single temp_increment. @@ -1120,7 +1239,8 @@ def stepped_ramp( [ Step( step_time / points_per_step, - from_temperature + (step_i + (1 if start_increment else 0)) * temp_increment, + from_temperature + + (step_i + (1 if start_increment else 0)) * temp_increment, collect=collect, filters=filters, repeat=points_per_step, @@ -1177,7 +1297,9 @@ def hold_at( total_time = _wrap_seconds(total_time) if step_time > total_time: - raise ValueError(f"Step time {step_time} > total time {total_time}. Did you mix up the parameter order?") + raise ValueError( + f"Step time {step_time} > total time {total_time}. Did you mix up the parameter order?" + ) repeat = round((total_time / step_time).to("").magnitude) @@ -1222,7 +1344,9 @@ def __repr__(self) -> str: s += ")" return s - def dataframe(self, start_time: float = 0, previous_temperatures: list[float] | None = None) -> pd.DataFrame: + def dataframe( + self, start_time: float = 0, previous_temperatures: list[float] | None = None + ) -> pd.DataFrame: """ Create a dataframe of the steps in this stage. @@ -1256,27 +1380,42 @@ def dataframe(self, start_time: float = 0, previous_temperatures: list[float] | ] ) ramp_rates = [ - 1.6 for _ in range(1, self.repeat + 1) for step in self.steps for point in range(1, step.repeat + 1) + 1.6 + for _ in range(1, self.repeat + 1) + for step in self.steps + for point in range(1, step.repeat + 1) ] # np.array( # [step.ramp_rate for _ in range(1, self.repeat + 1) for step in self.body] # ) collect_data = np.array( - [step.collects for _ in range(1, self.repeat + 1) for step in self.steps for _ in range(1, step.repeat + 1)] + [ + step.collects + for _ in range(1, self.repeat + 1) + for step in self.steps + for _ in range(1, step.repeat + 1) + ] ) # FIXME: is this how ramp rates actually work? ramp_durations = np.zeros(len(durations)) if previous_temperatures is not None: - ramp_durations[0] = np.max(np.abs(temperatures[0] - previous_temperatures)) / ramp_rates[0] - ramp_durations[1:] = np.max(np.abs(temperatures[1:] - temperatures[:-1]), axis=1) / ramp_rates[1:] + ramp_durations[0] = ( + np.max(np.abs(temperatures[0] - previous_temperatures)) / ramp_rates[0] + ) + ramp_durations[1:] = ( + np.max(np.abs(temperatures[1:] - temperatures[:-1]), axis=1) + / ramp_rates[1:] + ) tot_durations = durations + ramp_durations start_times = start_time + np.zeros(len(durations)) start_times[0] = start_time + ramp_durations[0] - start_times[1:] = start_time + np.cumsum(tot_durations[:-1]) + ramp_durations[1:] + start_times[1:] = ( + start_time + np.cumsum(tot_durations[:-1]) + ramp_durations[1:] + ) end_times = start_time + np.cumsum(tot_durations) @@ -1290,7 +1429,9 @@ def dataframe(self, start_time: float = 0, previous_temperatures: list[float] | data["temperature_avg"] = np.average(temperatures, axis=1) - for i in range(0, temperatures.shape[1]): # pylint: disable=unsubscriptable-object + for i in range( + 0, temperatures.shape[1] + ): # pylint: disable=unsubscriptable-object data["temperature_{}".format(i + 1)] = temperatures[:, i] data["cycle"] = [ @@ -1316,7 +1457,9 @@ def dataframe(self, start_time: float = 0, previous_temperatures: list[float] | return data - def to_scpicommand(self, stageindex: int | str | None = None, **kwargs: Any) -> SCPICommand: + def to_scpicommand( + self, stageindex: int | str | None = None, **kwargs: Any + ) -> SCPICommand: opts = {} args: list[int | str | list[SCPICommand]] = [] if self.repeat != 1: @@ -1329,14 +1472,22 @@ def to_scpicommand(self, stageindex: int | str | None = None, **kwargs: Any) -> raise ValueError("No index.") args.append(index_to_use) args.append(self.label or f"STAGE_{index_to_use}") - args.append([step.to_scpicommand(stepindex=i + 1, **kwargs) for i, step in enumerate(self.steps)]) + args.append( + [ + step.to_scpicommand(stepindex=i + 1, **kwargs) + for i, step in enumerate(self.steps) + ] + ) return SCPICommand("STAGe", *args, comment=None, **opts) @classmethod def from_scpicommand(cls, sc: SCPICommand, **kwargs: Any) -> Stage: c = cls( - [cast(CustomStep, x.specialize(**kwargs)) for x in cast(Sequence[SCPICommand], sc.args[2])], + [ + cast(CustomStep, x.specialize(**kwargs)) + for x in cast(Sequence[SCPICommand], sc.args[2]) + ], index=cast(int, sc.args[0]), label=cast(Optional[str], sc.args[1]), **sc.opts, # type: ignore @@ -1361,7 +1512,8 @@ def from_xml(cls, e: ET.Element) -> Stage: startcycle = int(cast(str, e.findtext("StartingCycle"))) ade = e.findtext("AutoDeltaEnabled") == "true" steps: list[CustomStep] = [ - Step.from_xml(x, etc=startcycle, ehtc=startcycle, he=ade) for x in e.findall("TCStep") + Step.from_xml(x, etc=startcycle, ehtc=startcycle, he=ade) + for x in e.findall("TCStep") ] return cls(steps, rep) @@ -1391,7 +1543,10 @@ def info_str(self, index: int | None = None) -> str: else: adds = "" stagestr = f"{index}. Stage with {self.repeat} cycle{adds}" - stepstrs = [textwrap.indent(f"{step.info_str(i+1, self.repeat)}", " ") for i, step in enumerate(self.steps)] + stepstrs = [ + textwrap.indent(f"{step.info_str(i+1, self.repeat)}", " ") + for i, step in enumerate(self.steps) + ] try: tot_dur = sum( (x.total_duration(self.repeat) for x in self.steps), @@ -1513,15 +1668,21 @@ def to_scpicommand(self, **kwargs: Any) -> SCPICommand: args.append(self.name) stages: list[SCPICommand] = [] if self.prerun: - stages.append(SCPICommand("PRERun", [s.to_scpicommand() for s in self.prerun])) + stages.append( + SCPICommand("PRERun", [s.to_scpicommand() for s in self.prerun]) + ) stages += [ - stage.to_scpicommand(filters=self.filters, stageindex=i + 1, default_filters=self.filters) + stage.to_scpicommand( + filters=self.filters, stageindex=i + 1, default_filters=self.filters + ) for i, stage in enumerate(self.stages) ] if self.postrun: - stages.append(SCPICommand("POSTRun", [s.to_scpicommand() for s in self.postrun])) + stages.append( + SCPICommand("POSTRun", [s.to_scpicommand() for s in self.postrun]) + ) args.append(stages) @@ -1699,7 +1860,9 @@ def from_xml(cls, e: ET.Element) -> Protocol: stages = [Stage.from_xml(x) for x in e.findall("TCStage")] return Protocol(stages, protoname, svol, runmode, filters, covertemperature) - def to_xml(self, covertemperature: float = 105.0) -> tuple[ET.ElementTree, ET.ElementTree]: + def to_xml( + self, covertemperature: float = 105.0 + ) -> tuple[ET.ElementTree, ET.ElementTree]: te = ET.ElementTree(ET.Element("TCProtocol")) tqe = ET.ElementTree(ET.Element("QSTCProtocol")) @@ -1714,7 +1877,9 @@ def to_xml(self, covertemperature: float = 105.0) -> tuple[ET.ElementTree, ET.El " placeholder for the real protocol, contained as" " an SCPI command in QSLibProtocolCommand." ) - _set_or_create(qe, "QSLibProtocolCommand").text = self.to_scpicommand().to_string() + _set_or_create( + qe, "QSLibProtocolCommand" + ).text = self.to_scpicommand().to_string() _set_or_create(qe, "QSLibProtocol").text = str(attr.asdict(self)) _set_or_create(qe, "QSLibVerson").text = __version__ _set_or_create(e, "CoverTemperature").text = str(covertemperature) @@ -1753,11 +1918,16 @@ def __str__(self) -> str: begin += ":\n" if self.filters: begin += ( - "(default filters " + _oxfordlist(FilterSet.fromstring(f).lowerform for f in self.filters) + ")\n\n" + "(default filters " + + _oxfordlist(FilterSet.fromstring(f).lowerform for f in self.filters) + + ")\n\n" ) else: begin += "\n" - stagestrs = [textwrap.indent(stage.info_str(i + 1), " ") for i, stage in enumerate(self.stages)] + stagestrs = [ + textwrap.indent(stage.info_str(i + 1), " ") + for i, stage in enumerate(self.stages) + ] return begin + "\n".join(stagestrs) @@ -1792,10 +1962,14 @@ def check_compatible(self, new: Protocol, status: RunStatus) -> bool: # assert self.name == new.name for i, (oldstage, newstage) in enumerate(zip_longest(self.stages, new.stages)): - if i + 1 < status.stage: # If the stage has already passed, we must be equal + if ( + i + 1 < status.stage + ): # If the stage has already passed, we must be equal if oldstage != newstage: raise ValueError - elif i + 1 == status.stage: # Current stage. Only change is # cycles, >= current + elif ( + i + 1 == status.stage + ): # Current stage. Only change is # cycles, >= current if newstage.repeat < status.cycle: raise ValueError oldstage.repeat = newstage.repeat # for comparison @@ -1819,7 +1993,8 @@ def validate(self, fix: bool = True): stage.index = i + 1 else: raise ValueError( - "Stage %s is at index %d of protocol, but has set index %d." % (stage, i + 1, stage.index) + "Stage %s is at index %d of protocol, but has set index %d." + % (stage, i + 1, stage.index) ) if stage.label is not None: diff --git a/src/qslib/qsconnection_async.py b/src/qslib/qsconnection_async.py index 9d26406..ed5cb53 100644 --- a/src/qslib/qsconnection_async.py +++ b/src/qslib/qsconnection_async.py @@ -44,10 +44,12 @@ def _parse_argstring(argstring: str) -> Dict[str, str]: return args -class AlreadyCollectedError(Exception): ... +class AlreadyCollectedError(Exception): + ... -class RunNotFinishedError(Exception): ... +class RunNotFinishedError(Exception): + ... @dataclass(frozen=True, order=True, eq=True) @@ -124,7 +126,8 @@ async def list_files( leaf: str = "FILE", verbose: Literal[True], recursive: bool = False, - ) -> list[dict[str, Any]]: ... + ) -> list[dict[str, Any]]: + ... @overload async def list_files( @@ -134,7 +137,8 @@ async def list_files( leaf: str = "FILE", verbose: Literal[False], recursive: bool = False, - ) -> list[str]: ... + ) -> list[str]: + ... @overload async def list_files( @@ -144,7 +148,8 @@ async def list_files( leaf: str = "FILE", verbose: bool = False, recursive: bool = False, - ) -> list[str] | list[dict[str, Any]]: ... + ) -> list[str] | list[dict[str, Any]]: + ... async def list_files( self, @@ -313,12 +318,19 @@ async def connect( else: self.port = 7000 - self._transport, proto = await self.loop.create_connection( - QS_IS_Protocol, - self.host, - int(cast("int | str", self.port)), - ssl=CTX if cast(bool, self.ssl) else None, - ) + self._transport, proto = await self.loop.create_connection( + QS_IS_Protocol, + self.host, + int(cast("int | str", self.port)), + ssl=CTX if cast(bool, self.ssl) else None, + ) + else: + self._transport, proto = await self.loop.create_connection( + QS_IS_Protocol, + self.host, + int(cast("int | str", self.port)), + ssl=CTX if cast(bool, self.ssl) else None, + ) self._protocol = cast(QS_IS_Protocol, proto) @@ -462,7 +474,8 @@ async def get_filterdata_one( *, run: Optional[str] = None, return_files: Literal[True], - ) -> tuple[data.FilterDataReading, list[tuple[str, bytes]]]: ... + ) -> tuple[data.FilterDataReading, list[tuple[str, bytes]]]: + ... @overload async def get_filterdata_one( @@ -471,7 +484,8 @@ async def get_filterdata_one( *, run: Optional[str] = None, return_files: Literal[False] = False, - ) -> data.FilterDataReading: ... + ) -> data.FilterDataReading: + ... async def get_filterdata_one( self, @@ -513,12 +527,14 @@ async def get_filterdata_one( @overload async def get_all_filterdata( self, run: Optional[str], as_list: Literal[True] - ) -> List[data.FilterDataReading]: ... + ) -> List[data.FilterDataReading]: + ... @overload async def get_all_filterdata( self, run: Optional[str], as_list: Literal[False] - ) -> pd.DataFrame: ... + ) -> pd.DataFrame: + ... async def get_all_filterdata( self, run: str | None = None, as_list: bool = False