diff --git a/python/examples/trigger.py b/python/examples/trigger.py new file mode 100644 index 0000000..5e7cbfc --- /dev/null +++ b/python/examples/trigger.py @@ -0,0 +1,35 @@ +from time import sleep + +import picoquake + +if __name__ == "__main__": + # Create a PicoQuake device + device = picoquake.PicoQuake("c6e3") + + # Configure acquisition + device.configure( + sample_rate=picoquake.SampleRate.hz_100, + filter_hz=picoquake.Filter.hz_42, + acc_range=picoquake.AccRange.g_16, + gyro_range=picoquake.GyroRange.dps_2000, + ) + + # Acquire data from the device + data, exception = device.trigger(rms_threshold=2, + pre_seconds=2, + post_seconds=5, + source="accel", + axis="xyz",) + + # Stop the device + device.stop() + + # Check if an exception occurred + if exception is not None: + raise exception + + # Print the acquired data + print(f"Data: {data}") + + # Save to a CSV file + data.to_csv("acquisition_triggered.csv") \ No newline at end of file diff --git a/python/picoquake/__init__.py b/python/picoquake/__init__.py index 1643055..73ffc81 100644 --- a/python/picoquake/__init__.py +++ b/python/picoquake/__init__.py @@ -3,4 +3,4 @@ from .data import AcquisitionData, IMUSample from .plot import * -__version__ = "1.0.4" +__version__ = "1.1.0-beta" diff --git a/python/picoquake/analisys.py b/python/picoquake/analisys.py new file mode 100644 index 0000000..3b51198 --- /dev/null +++ b/python/picoquake/analisys.py @@ -0,0 +1,62 @@ +""" +This module implements various data analysis functions. +""" + +from typing import List, Tuple + +from .data import IMUSample + + +# def rms(samples: List[IMUSample]) -> Tuple[float, float, float, float, float, float]: +# """ +# Calculate the root mean square of the acceleration and angular velocity components. + +# Args: +# samples: List of IMU samples. + +# Returns: +# Tuple of the root mean square of the acceleration and angular velocity components in the order: +# (rms_ax, rms_ay, rms_az, rms_gx, rms_gy, rms_gz) +# """ +# rms_ax = (sum([sample.acc_x ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_ay = (sum([sample.acc_y ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_az = (sum([sample.acc_z ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_gx = (sum([sample.gyro_x ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_gy = (sum([sample.gyro_y ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_gz = (sum([sample.gyro_z ** 2 for sample in samples]) / len(samples)) ** 0.5 +# return (rms_ax, rms_ay, rms_az, rms_gx, rms_gy, rms_gz) + +# def rms_combined(samples: List[IMUSample]) -> Tuple[float, float]: +# """ +# Calculate the root mean square of the combined acceleration and angular velocity components. + +# Args: +# samples: List of IMU samples. + +# Returns: +# Tuple of the root mean square of the combined acceleration and angular velocity components in the order: +# (rms_a, rms_g) +# """ +# rms_a = (sum([sample.acc_x ** 2 + sample.acc_y ** 2 + sample.acc_z ** 2 for sample in samples]) / len(samples)) ** 0.5 +# rms_g = (sum([sample.gyro_x ** 2 + sample.gyro_y ** 2 + sample.gyro_z ** 2 for sample in samples]) / len(samples)) ** 0.5 +# return (rms_a, rms_g) + +def rms(samples: List[IMUSample], axes: str) -> Tuple[float, float]: + """ + Calculate the root mean square of the acceleration and angular velocity components for the specified axes. + + Args: + samples: List of IMU samples. + axes: String with the axes to calculate the RMS values. Must be a combination of 'x', 'y', and 'z'. + + Returns: + Tuple of the root mean square of the acceleration and angular velocity. + """ + + x = int('x' in axes) + y = int('y' in axes) + z = int('z' in axes) + + rms_acc = (sum([sample.acc_x ** 2 * x + sample.acc_y ** 2 * y + sample.acc_z ** 2 * z for sample in samples]) / len(samples)) ** 0.5 + rms_gyro = (sum([sample.gyro_x ** 2 * x + sample.gyro_y ** 2 * y + sample.gyro_z ** 2 * z for sample in samples]) / len(samples)) ** 0.5 + return (rms_acc, rms_gyro) \ No newline at end of file diff --git a/python/picoquake/interface.py b/python/picoquake/interface.py index d0219cd..44c18f4 100644 --- a/python/picoquake/interface.py +++ b/python/picoquake/interface.py @@ -20,6 +20,8 @@ from .configuration import * from .data import * from .exceptions import * +from .analisys import * +from .utils import * VID = 0x2E8A PID = 0xA @@ -286,6 +288,72 @@ def read(self, num: int=1, timeout: Optional[float]=None) -> List[IMUSample]: for _ in range(num_ret): samples.append(self._sample_deque.popleft()) return samples + + def trigger(self, rms_threshold: float, pre_seconds: float, post_seconds: + float, source: str="accel", axis: str="xyz", + rms_window: float=1.0, rms_interval: float=0.1) -> Tuple[AcquisitionData, Optional[Exception]]: + """ + Triggers the device to start sampling when the RMS value of the acceleration exceeds the specified value. + """ + if source not in ["accel", "gyro"]: + raise ValueError("Source must be 'accel' or 'gyro'") + combinations = get_axis_combinations("xyz") + if axis not in combinations: + raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.") + + window_len = int(rms_window * self.config.sample_rate.param_value) + n_pre_samples = int(pre_seconds * self.config.sample_rate.param_value) + n_post_samples = int(post_seconds * self.config.sample_rate.param_value) + n_samples = n_pre_samples + n_post_samples + len_deque_at_trigger = 0 + exception: Optional[Exception] = None + + self.start_continuos() + self._logger.info(f"Triggering on RMS value {rms_threshold} g") + + while True: + samples = deque_get_last_n(self._sample_deque, window_len) + if len(samples) < window_len: + sleep(0.001) + continue + rms_acc, rms_gyro = rms(samples, axis) + if source == "accel": + rms_val = rms_acc + else: + rms_val = rms_gyro + if rms_val > rms_threshold: + len_deque_at_trigger = len(self._sample_deque) + start_t = time() + break + sleep(rms_interval) + self._logger.info(f"Triggered on RMS value {rms_val:.3f} g") + while True: + if len(self._sample_deque) - len_deque_at_trigger > post_seconds * self.config.sample_rate.param_value: + break + if self._exception is not None: + exception = self._exception + break + sleep(0.001) + stop_t = time() + self.stop_continuos() + self._logger.info(f"Acquisition stopped. Took: {stop_t - start_t:.1f}s.") + self._logger.info(f"Received {len(samples)} samples") + samples = deque_slice(self._sample_deque, + len_deque_at_trigger - n_pre_samples, + len_deque_at_trigger + n_post_samples) + data = AcquisitionData(samples=samples, + device=cast(DeviceInfo, self.device_info), + config=self.config, + start_time=datetime.fromtimestamp(start_t)) + if exception is None: + if len(samples) < n_samples: + self._logger.warning(f"Expected {n_samples} samples, received {len(samples)}") + exception = ConnectionError("Not all samples received") + elif not data._check_integrity: + self._logger.warning(f"Data integrity compromised, {data.skipped_samples} samples skipped") + exception = ConnectionError("Data integrity compromised") + return data, exception + def read_last(self, timeout: Optional[float]=None) -> Optional[IMUSample]: """ diff --git a/python/picoquake/plot.py b/python/picoquake/plot.py index d316edd..8f96b34 100644 --- a/python/picoquake/plot.py +++ b/python/picoquake/plot.py @@ -1,6 +1,7 @@ from itertools import permutations from .data import * +from .utils import get_axis_combinations def plot_psd(result: AcquisitionData, output_file: str, axis: str = "xyz", freq_min: float = 0, freq_max: Optional[float] = None, @@ -9,7 +10,7 @@ def plot_psd(result: AcquisitionData, output_file: str, axis: str = "xyz", import matplotlib.pyplot as plt from scipy.signal import welch, find_peaks - combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i)) + combinations = get_axis_combinations("xyz") if axis not in combinations: raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.") if not result.integrity: @@ -84,7 +85,7 @@ def plot_fft(result: AcquisitionData, output_file: str, axis: str = "xyz", from scipy.signal import find_peaks from scipy.signal.windows import hann - combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i)) + combinations = get_axis_combinations("xyz") if axis not in combinations: raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.") if not result.integrity: @@ -160,7 +161,7 @@ def plot(result: AcquisitionData, output_file: str, axis: str = "xyz", import numpy as np import matplotlib.pyplot as plt - combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i)) + combinations = get_axis_combinations("xyz") if axis not in combinations: raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.") if not result.integrity: diff --git a/python/picoquake/utils.py b/python/picoquake/utils.py new file mode 100644 index 0000000..52a3e02 --- /dev/null +++ b/python/picoquake/utils.py @@ -0,0 +1,48 @@ +from itertools import permutations +from collections import deque +from typing import List, Any, Optional + + +def get_axis_combinations(axis: str) -> set: + return set(''.join(p) for i in range(1, len(axis) + 1) for p in permutations(axis, i)) + + +def deque_get_last_n(data: deque, n: int) -> List[Any]: + """ + Get the last n elements from a deque. + + Args: + data: Deque with data. + n: Number of elements to get. + + Returns: + List with the last n elements from the deque. If n is greater than the length of the deque, all elements are returned. + """ + start_idx = max(0, len(data) - n) + return [data[i] for i in range(start_idx, len(data))] + +def deque_slice(dq: deque, start: Optional[int], end: Optional[int] = None) -> List[Any]: + """ + Return a slice from the deque. + + Args: + dq: The deque to slice. + start: The starting index of the slice. + end : The ending index of the slice. + + Returns: + deque: A deque containing the specified slice. + """ + if start is None: + start = 0 + if end is None: + end = len(dq) + if start < 0: + start = len(dq) + start + if end < 0: + end = len(dq) + end + if start < 0: + start = 0 + if end > len(dq): + end = len(dq) + return [dq[i] for i in range(start, end)] diff --git a/python/pyproject.toml b/python/pyproject.toml index 4948295..43ad245 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "picoquake" -version = "1.0.4" +version = "1.1.0-beta" description = "PicoQuake USB vibration sensor library." requires-python = ">=3.9" readme = "README.md" @@ -41,6 +41,12 @@ plot = [ "scipy~=1.6", "matplotlib~=3.4" ] +test = [ + "numpy~=1.19", + "scipy~=1.6", + "matplotlib~=3.4", + "pytest~=6.2" +] [project.urls] Repository = "https://github.com/PLab-SI/PicoQuake" diff --git a/python/test/test_analysis.py b/python/test/test_analysis.py new file mode 100644 index 0000000..24c007d --- /dev/null +++ b/python/test/test_analysis.py @@ -0,0 +1,32 @@ +from pytest import approx + +from picoquake.data import IMUSample +from picoquake.analisys import * + +def test_rms(): + samples = [ + IMUSample(0, 1, 2, 3, 4, 5, 6), + IMUSample(1, 1, 2, 3, 4, 5, 6), + IMUSample(2, 1, 2, 3, 4, 5, 6), + ] + + # Test for all axes + acc_rms, gyro_rms = rms(samples, 'xyz') + assert approx(acc_rms, 1e-3) == 3.7417 + assert approx(gyro_rms, 1e-3) == 8.775 + + # Test for x axis only + acc_rms, gyro_rms = rms(samples, 'x') + assert approx(acc_rms, 1e-3) == 1.0 + assert approx(gyro_rms, 1e-3) == 4.0 + + # Test for y axis only + acc_rms, gyro_rms = rms(samples, 'y') + assert approx(acc_rms, 1e-3) == 2.0 + assert approx(gyro_rms, 1e-3) == 5.0 + + # Test for z axis only + acc_rms, gyro_rms = rms(samples, 'z') + assert approx(acc_rms, 1e-3) == 3.0 + assert approx(gyro_rms, 1e-3) == 6.0 + \ No newline at end of file diff --git a/python/test/test_utils.py b/python/test/test_utils.py new file mode 100644 index 0000000..f862328 --- /dev/null +++ b/python/test/test_utils.py @@ -0,0 +1,36 @@ +from picoquake.utils import * + + +def test_get_axis_combinations(): + assert get_axis_combinations("x") == {"x"} + assert get_axis_combinations("xyz") == {"x", "y", "z", + "xy", "xz", "yx", "yz", "zx", "zy", + "xyz", "xzy", "yxz", "yzx", "zxy", "zyx"} + assert get_axis_combinations("abcd") == {"a", "b", "c", "d", + "ab", "ac", "ad", "ba", "bc", "bd", "ca", "cb", "cd", "da", "db", "dc", + "abc", "abd", "acb", "acd", "adb", "adc", "bac", "bad", "bca", "bcd", "bda", "bdc", + "cab", "cad", "cba", "cbd", "cda", "cdb", "dab", "dac", "dba", "dbc", "dca", "dcb", + "abcd", "abdc", "acbd", "acdb", "adbc", "adcb", "bacd", "badc", "bcad", "bcda", "bdac", "bdca", + "cabd", "cadb", "cbad", "cbda", "cdab", "cdba", "dabc", "dacb", "dbac", "dbca", "dcab", "dcba"} + + +def test_deque_get_last_n(): + d = deque([1, 2, 3, 4, 5]) + assert deque_get_last_n(d, 3) == [3, 4, 5] + assert deque_get_last_n(d, 5) == [1, 2, 3, 4, 5] + assert deque_get_last_n(d, 6) == [1, 2, 3, 4, 5] + assert deque_get_last_n(d, 0) == [] + assert deque_get_last_n(d, -1) == [] + d = deque() + assert deque_get_last_n(d, 3) == [] + assert deque_get_last_n(d, 0) == [] + assert deque_get_last_n(d, -1) == [] + + +def test_deque_slice(): + dq = deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + test_slices = [(None, None), (None, 3), (0, 3), (0, 10), (3, 7), + (None, -1), (-5, None), (-5, -3), (0, -3), (3, -3), (0, -10)] + for start, end in test_slices: + assert deque_slice(dq, start, end) == list(dq)[start:end] + \ No newline at end of file