Skip to content

Commit

Permalink
remove MainWindow and Parameters widgets, and split remaining code in…
Browse files Browse the repository at this point in the history
… aperoll.widgets.parameters into a new module aperoll.widgets.line_edit and aperoll.utils
  • Loading branch information
javierggt committed Jan 28, 2025
1 parent ede1632 commit c6a9525
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 904 deletions.
153 changes: 153 additions & 0 deletions aperoll/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import functools
import gzip
import json
import pickle
import traceback

import maude
import numpy as np
from chandra_aca.transform import (
pixels_to_yagzag,
yagzag_to_pixels,
)
from cxotime.cxotime import CxoTime
from kadi.commands.observations import get_detector_and_sim_offset
from Quaternion import Quat
from ska_helpers import logging

logger = logging.basic_logger("aperoll")
Expand All @@ -25,6 +32,7 @@ class AperollException(RuntimeError):

def log_exception(msg, exc, level="DEBUG"):
import logging

trace = traceback.extract_tb(exc.__traceback__)
level = logging.__getattribute__(level)
logger.log(level, f"{msg}: {exc}")
Expand Down Expand Up @@ -127,3 +135,148 @@ def get_camera_fov_frame():
)

return frame


def get_default_parameters():
"""
Get default initial parameters from current telemetry.
"""

msid_list = ["3TSCPOS", "AACCCDPT"] + [f"aoattqt{i}".upper() for i in range(1, 5)]
msids = maude.get_msids(msid_list)
data = {msid: msids["data"][i]["values"][-1] for i, msid in enumerate(msid_list)}
q = Quat(q=[data[f"AOATTQT{i}"] for i in range(1, 5)])

instrument, sim_offset = get_detector_and_sim_offset(data["3TSCPOS"])
t_ccd = (data["AACCCDPT"] - 32) * 5 / 9

result = {
"date": CxoTime().date,
"attitude": q,
"ra": q.ra,
"dec": q.dec,
"roll": q.roll,
"instrument": instrument,
"sim_offset": sim_offset,
"t_ccd": t_ccd,
"obsid": 0,
"man_angle": 0,
"dither_acq_y": 16,
"dither_acq_z": 16,
"dither_guide_y": 16,
"dither_guide_z": 16,
"n_fid": 0,
"n_guide": 8,
}

return result


def get_parameters_from_yoshi(filename, obsid=None):
"""
Get initial parameters from a Yoshi JSON file.
"""

with open(filename) as fh:
contents = json.load(fh)
if obsid is not None:
contents = [obs for obs in contents if obs["obsid"] == obsid]
if not contents:
raise AperollException(f"OBSID {obsid} not found in {filename}")

if not contents:
raise AperollException(f"No entries found in {filename}")

yoshi_params = contents[0] # assuming there is only one entry

yoshi_params["date"] = yoshi_params["obs_date"]
yoshi_params["ra"] = yoshi_params["ra_targ"]
yoshi_params["dec"] = yoshi_params["dec_targ"]
yoshi_params["roll"] = yoshi_params["roll_targ"]
yoshi_params["instrument"] = yoshi_params["detector"]
for key in [
"obs_date",
"ra_targ",
"dec_targ",
"roll_targ",
"detector",
]:
del yoshi_params[key]

if abs(yoshi_params.get("obsid", 0)) < 38000:
yoshi_params["n_fid"] = "3"
yoshi_params["n_guide"] = "5"
else:
yoshi_params["n_fid"] = "0"
yoshi_params["n_guide"] = "8"

att = Quat(
equatorial=(yoshi_params["ra"], yoshi_params["dec"], yoshi_params["roll"])
)

default = get_default_parameters()
parameters = {
"obsid": yoshi_params.get("obsid", default.get("obsid", 0)),
"man_angle": yoshi_params.get("man_angle", default.get("man_angle", 0)),
"dither_acq_y": yoshi_params.get("dither_y", default.get("dither_acq_y", 16)),
"dither_acq_z": yoshi_params.get("dither_z", default.get("dither_acq_z", 16)),
"dither_guide_y": yoshi_params.get(
"dither_y", default.get("dither_guide_y", 16)
),
"dither_guide_z": yoshi_params.get(
"dither_z", default.get("dither_guide_z", 16)
),
"date": yoshi_params.get("date", default["date"]),
"attitude": att,
"ra": yoshi_params.get("ra", default["attitude"].ra),
"dec": yoshi_params.get("dec", default["attitude"].dec),
"roll": yoshi_params.get("roll", default["attitude"].roll),
"t_ccd": yoshi_params.get("t_ccd", default["t_ccd"]),
"instrument": yoshi_params.get("instrument", default["instrument"]),
"n_guide": yoshi_params["n_guide"],
"n_fid": yoshi_params["n_fid"],
}
return parameters


def get_parameters_from_pickle(filename, obsid=None):
"""
Get initial parameters from a proseco pickle file.
"""
open_fcn = open if filename.endswith(".pkl") else gzip.open
with open_fcn(filename, "rb") as fh:
catalogs = pickle.load(fh)

if not catalogs:
raise AperollException(f"No entries found in {filename}")

if obsid is None:
# this is ugly but it works whether the keys are strings of floats or ints
obsid = int(np.round(float(list(catalogs.keys())[0])))

if float(obsid) not in catalogs:
raise AperollException(f"OBSID {obsid} not found in {filename}")

catalog = catalogs[float(obsid)]

parameters = {
"obsid": obsid,
"man_angle": catalog.man_angle,
"dither_acq_y": catalog.dither_acq.y,
"dither_acq_z": catalog.dither_acq.z,
"dither_guide_y": catalog.dither_guide.y,
"dither_guide_z": catalog.dither_guide.z,
"date": CxoTime(
catalog.date
).date, # date is not guaranteed to be a fixed type in pickle
"attitude": catalog.att,
"ra": catalog.att.ra,
"dec": catalog.att.dec,
"roll": catalog.att.roll,
"t_ccd_acq": catalog.t_ccd_acq,
"t_ccd_guide": catalog.t_ccd_guide,
"instrument": catalog.detector,
"n_guide": catalog.n_guide,
"n_fid": catalog.n_fid,
}
return parameters
36 changes: 36 additions & 0 deletions aperoll/widgets/line_edit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from PyQt5 import QtCore as QtC
from PyQt5 import QtWidgets as QtW


class LineEdit(QtW.QLineEdit):
"""
A QLineEdit with a signal emitted when pressing Enter, loosing focus or calling setText.
The signal is emitted only if the text changed.
"""

value_changed = QtC.pyqtSignal(str)

def __init__(self, parent):
super().__init__(parent)
self._prev_text = self.text()

def setText(self, text, emit=False):
super().setText(text)
# in the following, emit=False so the signal is not emitted
self._check_value(emit=emit)

def keyPressEvent(self, event):
super().keyPressEvent(event)
if event.key() == QtC.Qt.Key_Return:
self._check_value()

def focusOutEvent(self, event):
super().focusOutEvent(event)
self._check_value()

def _check_value(self, emit=True):
if self.text() != self._prev_text:
self._prev_text = self.text()
if emit:
self.value_changed.emit(self.text())
Loading

0 comments on commit c6a9525

Please sign in to comment.