Skip to content

Commit

Permalink
🥅 Ensure full coverage of runner for errors, so nothing fails silently
Browse files Browse the repository at this point in the history
  • Loading branch information
cp2004 committed Mar 1, 2022
1 parent 0102e0f commit b40d421
Showing 1 changed file with 90 additions and 82 deletions.
172 changes: 90 additions & 82 deletions octoprint_ws281x_led_status/runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,98 +56,106 @@ def __init__(
self.setup_custom_logger(log_path, debug)
self._logger.debug("Starting WS281x LED Status Effect runner")

self.segment_manager = None # type Optional[segments.SegmentManager]

# Save settings to class
self.strip_settings = strip_settings
self.effect_settings = effect_settings
self.features_settings = features_settings
self.active_times_settings = features_settings["active_times"]
self.transition_settings = features_settings["transitions"]
self.max_brightness = int(
round((float(strip_settings["brightness"]) / 100) * 255)
)
self.color_correction = {
"red": self.strip_settings["adjustment"]["R"],
"green": self.strip_settings["adjustment"]["G"],
"blue": self.strip_settings["adjustment"]["B"],
"white_override": self.strip_settings["white_override"],
"white_brightness": self.strip_settings["white_brightness"],
}

# Create segment settings
# Segments are EXPERIMENTAL and only enabled for certain conditions
self.segment_settings = []

# Sacrificial pixel offsets by one
default_segment = {"start": 0, "end": int(self.strip_settings["count"])}
if self.features_settings["sacrifice_pixel"]:
default_segment["start"] = 1

self.segment_settings.append(default_segment)

if int(self.strip_settings["count"]) < 6:
self._logger.info("Applying < 6 LED flickering bug workaround")
# rpi_ws281x will think we want 6 LEDs, but we will only use those configured
# this works around issues where LEDs would show the wrong colour, flicker and more
# when used with less than 6 LEDs.
# See #132 for details
self.strip_settings["count"] = 6

# State holders
self.lights_on = saved_lights_on
self.previous_state = previous_state
self.previous_m150 = {} # type: dict
self.active_times_state = True
self.turn_off_timer = None

self.queue = queue # type: multiprocessing.Queue
try:
self.strip = self.start_strip() # type: PixelStrip
except (StripFailedError, segments.InvalidSegmentError):
self._logger.error("Exiting the effect process")
return
except Exception as e:
self._logger.exception(e)
self._logger.error("Exiting the effect process")
return

self.effect_queue = Queue()
self.effect_thread = None # type: Optional[threading.Thread]

self.brightness_manager = BrightnessManager(
self.strip, self.max_brightness, self.transition_settings
)

# Create 'Active Times' background timers
self.active_times_timer = active_times.ActiveTimer(
self.active_times_settings, self.switch_lights
)
self.active_times_timer.start_timer()
# This entire thing is wrapped in a try: except block because otherwise errors here would
# cause the process to crash but nobody would know about it, it died silently.

self.segment_manager = None # type Optional[segments.SegmentManager]

# Save settings to class
self.strip_settings = strip_settings
self.effect_settings = effect_settings
self.features_settings = features_settings
self.active_times_settings = features_settings["active_times"]
self.transition_settings = features_settings["transitions"]
self.max_brightness = int(
round((float(strip_settings["brightness"]) / 100) * 255)
)
self.color_correction = {
"red": self.strip_settings["adjustment"]["R"],
"green": self.strip_settings["adjustment"]["G"],
"blue": self.strip_settings["adjustment"]["B"],
"white_override": self.strip_settings["white_override"],
"white_brightness": self.strip_settings["white_brightness"],
}

if debug:
self.log_settings()
else:
self._logger.info(
"Debug logging not enabled, if you are reporting issues please enable it "
"under 'Features' in the settings page."
# Create segment settings
# Segments are EXPERIMENTAL and only enabled for certain conditions
self.segment_settings = []

# Sacrificial pixel offsets by one
default_segment = {"start": 0, "end": int(self.strip_settings["count"])}
if self.features_settings["sacrifice_pixel"]:
default_segment["start"] = 1

self.segment_settings.append(default_segment)

if int(self.strip_settings["count"]) < 6:
self._logger.info("Applying < 6 LED flickering bug workaround")
# rpi_ws281x will think we want 6 LEDs, but we will only use those configured
# this works around issues where LEDs would show the wrong colour, flicker and more
# when used with less than 6 LEDs.
# See #132 for details
self.strip_settings["count"] = 6

# State holders
self.lights_on = saved_lights_on
self.previous_state = previous_state
self.previous_m150 = {} # type: dict
self.active_times_state = True
self.turn_off_timer = None

self.queue = queue # type: multiprocessing.Queue
try:
self.strip = self.start_strip() # type: PixelStrip
except (StripFailedError, segments.InvalidSegmentError):
self._logger.error("Exiting the effect process")
return
except Exception as e:
self._logger.exception(e)
self._logger.error("Exiting the effect process")
return

self.effect_queue = Queue()
self.effect_thread = None # type: Optional[threading.Thread]

self.brightness_manager = BrightnessManager(
self.strip, self.max_brightness, self.transition_settings
)

# Set back previous state, unless it is `blank`, then start main loop
if not (
self.previous_state["type"] == "standard"
and self.previous_state["effect"] == "blank"
):
self._logger.debug(
"Returning to previous state: {}".format(self.previous_state)
# Create 'Active Times' background timers
self.active_times_timer = active_times.ActiveTimer(
self.active_times_settings, self.switch_lights
)
self.parse_q_msg(self.previous_state)
self.active_times_timer.start_timer()

if debug:
self.log_settings()
else:
self._logger.info(
"Debug logging not enabled, if you are reporting issues please enable it "
"under 'Features' in the settings page."
)
except Exception as e:
self._logger.error("Unhandled error starting the effect runner")
self._logger.exception(e)
return

self._logger.info("Startup Complete!")
self.main_loop()

def main_loop(self):
try:
# Set back previous state, unless it is `blank`, then start main loop
if not (
self.previous_state["type"] == "standard"
and self.previous_state["effect"] == "blank"
):
self._logger.debug(
"Returning to previous state: {}".format(self.previous_state)
)
self.parse_q_msg(self.previous_state)

self._logger.info("Starting main loop")
while True:
msg = self.queue.get()
self._logger.debug("New message: {}".format(msg))
Expand Down

0 comments on commit b40d421

Please sign in to comment.