Skip to content

Commit

Permalink
added trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
nplan committed Jul 10, 2024
1 parent a5cff85 commit a388c3b
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 5 deletions.
35 changes: 35 additions & 0 deletions python/examples/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from time import sleep

import picoquake

if __name__ == "__main__":
# Create a PicoQuake device
device = picoquake.PicoQuake("c6e3")

# Configure acquisition
device.configure(
sample_rate=picoquake.SampleRate.hz_100,
filter_hz=picoquake.Filter.hz_42,
acc_range=picoquake.AccRange.g_16,
gyro_range=picoquake.GyroRange.dps_2000,
)

# Acquire data from the device
data, exception = device.trigger(rms_threshold=2,
pre_seconds=2,
post_seconds=5,
source="accel",
axis="xyz",)

# Stop the device
device.stop()

# Check if an exception occurred
if exception is not None:
raise exception

# Print the acquired data
print(f"Data: {data}")

# Save to a CSV file
data.to_csv("acquisition_triggered.csv")
2 changes: 1 addition & 1 deletion python/picoquake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .data import AcquisitionData, IMUSample
from .plot import *

__version__ = "1.0.4"
__version__ = "1.1.0-beta"
62 changes: 62 additions & 0 deletions python/picoquake/analisys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
This module implements various data analysis functions.
"""

from typing import List, Tuple

from .data import IMUSample


# def rms(samples: List[IMUSample]) -> Tuple[float, float, float, float, float, float]:
# """
# Calculate the root mean square of the acceleration and angular velocity components.

# Args:
# samples: List of IMU samples.

# Returns:
# Tuple of the root mean square of the acceleration and angular velocity components in the order:
# (rms_ax, rms_ay, rms_az, rms_gx, rms_gy, rms_gz)
# """
# rms_ax = (sum([sample.acc_x ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_ay = (sum([sample.acc_y ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_az = (sum([sample.acc_z ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_gx = (sum([sample.gyro_x ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_gy = (sum([sample.gyro_y ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_gz = (sum([sample.gyro_z ** 2 for sample in samples]) / len(samples)) ** 0.5
# return (rms_ax, rms_ay, rms_az, rms_gx, rms_gy, rms_gz)

# def rms_combined(samples: List[IMUSample]) -> Tuple[float, float]:
# """
# Calculate the root mean square of the combined acceleration and angular velocity components.

# Args:
# samples: List of IMU samples.

# Returns:
# Tuple of the root mean square of the combined acceleration and angular velocity components in the order:
# (rms_a, rms_g)
# """
# rms_a = (sum([sample.acc_x ** 2 + sample.acc_y ** 2 + sample.acc_z ** 2 for sample in samples]) / len(samples)) ** 0.5
# rms_g = (sum([sample.gyro_x ** 2 + sample.gyro_y ** 2 + sample.gyro_z ** 2 for sample in samples]) / len(samples)) ** 0.5
# return (rms_a, rms_g)

def rms(samples: List[IMUSample], axes: str) -> Tuple[float, float]:
"""
Calculate the root mean square of the acceleration and angular velocity components for the specified axes.
Args:
samples: List of IMU samples.
axes: String with the axes to calculate the RMS values. Must be a combination of 'x', 'y', and 'z'.
Returns:
Tuple of the root mean square of the acceleration and angular velocity.
"""

x = int('x' in axes)
y = int('y' in axes)
z = int('z' in axes)

rms_acc = (sum([sample.acc_x ** 2 * x + sample.acc_y ** 2 * y + sample.acc_z ** 2 * z for sample in samples]) / len(samples)) ** 0.5
rms_gyro = (sum([sample.gyro_x ** 2 * x + sample.gyro_y ** 2 * y + sample.gyro_z ** 2 * z for sample in samples]) / len(samples)) ** 0.5
return (rms_acc, rms_gyro)
68 changes: 68 additions & 0 deletions python/picoquake/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from .configuration import *
from .data import *
from .exceptions import *
from .analisys import *
from .utils import *

VID = 0x2E8A
PID = 0xA
Expand Down Expand Up @@ -286,6 +288,72 @@ def read(self, num: int=1, timeout: Optional[float]=None) -> List[IMUSample]:
for _ in range(num_ret):
samples.append(self._sample_deque.popleft())
return samples

def trigger(self, rms_threshold: float, pre_seconds: float, post_seconds:
float, source: str="accel", axis: str="xyz",
rms_window: float=1.0, rms_interval: float=0.1) -> Tuple[AcquisitionData, Optional[Exception]]:
"""
Triggers the device to start sampling when the RMS value of the acceleration exceeds the specified value.
"""
if source not in ["accel", "gyro"]:
raise ValueError("Source must be 'accel' or 'gyro'")
combinations = get_axis_combinations("xyz")
if axis not in combinations:
raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.")

window_len = int(rms_window * self.config.sample_rate.param_value)
n_pre_samples = int(pre_seconds * self.config.sample_rate.param_value)
n_post_samples = int(post_seconds * self.config.sample_rate.param_value)
n_samples = n_pre_samples + n_post_samples
len_deque_at_trigger = 0
exception: Optional[Exception] = None

self.start_continuos()
self._logger.info(f"Triggering on RMS value {rms_threshold} g")

while True:
samples = deque_get_last_n(self._sample_deque, window_len)
if len(samples) < window_len:
sleep(0.001)
continue
rms_acc, rms_gyro = rms(samples, axis)
if source == "accel":
rms_val = rms_acc
else:
rms_val = rms_gyro
if rms_val > rms_threshold:
len_deque_at_trigger = len(self._sample_deque)
start_t = time()
break
sleep(rms_interval)
self._logger.info(f"Triggered on RMS value {rms_val:.3f} g")
while True:
if len(self._sample_deque) - len_deque_at_trigger > post_seconds * self.config.sample_rate.param_value:
break
if self._exception is not None:
exception = self._exception
break
sleep(0.001)
stop_t = time()
self.stop_continuos()
self._logger.info(f"Acquisition stopped. Took: {stop_t - start_t:.1f}s.")
self._logger.info(f"Received {len(samples)} samples")
samples = deque_slice(self._sample_deque,
len_deque_at_trigger - n_pre_samples,
len_deque_at_trigger + n_post_samples)
data = AcquisitionData(samples=samples,
device=cast(DeviceInfo, self.device_info),
config=self.config,
start_time=datetime.fromtimestamp(start_t))
if exception is None:
if len(samples) < n_samples:
self._logger.warning(f"Expected {n_samples} samples, received {len(samples)}")
exception = ConnectionError("Not all samples received")
elif not data._check_integrity:
self._logger.warning(f"Data integrity compromised, {data.skipped_samples} samples skipped")
exception = ConnectionError("Data integrity compromised")
return data, exception


def read_last(self, timeout: Optional[float]=None) -> Optional[IMUSample]:
"""
Expand Down
7 changes: 4 additions & 3 deletions python/picoquake/plot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from itertools import permutations

from .data import *
from .utils import get_axis_combinations

def plot_psd(result: AcquisitionData, output_file: str, axis: str = "xyz",
freq_min: float = 0, freq_max: Optional[float] = None,
Expand All @@ -9,7 +10,7 @@ def plot_psd(result: AcquisitionData, output_file: str, axis: str = "xyz",
import matplotlib.pyplot as plt
from scipy.signal import welch, find_peaks

combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i))
combinations = get_axis_combinations("xyz")
if axis not in combinations:
raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.")
if not result.integrity:
Expand Down Expand Up @@ -84,7 +85,7 @@ def plot_fft(result: AcquisitionData, output_file: str, axis: str = "xyz",
from scipy.signal import find_peaks
from scipy.signal.windows import hann

combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i))
combinations = get_axis_combinations("xyz")
if axis not in combinations:
raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.")
if not result.integrity:
Expand Down Expand Up @@ -160,7 +161,7 @@ def plot(result: AcquisitionData, output_file: str, axis: str = "xyz",
import numpy as np
import matplotlib.pyplot as plt

combinations = set(''.join(p) for i in range(1, 4) for p in permutations("xyz", i))
combinations = get_axis_combinations("xyz")
if axis not in combinations:
raise ValueError("Invalid axis, must be 'x', 'y', 'z', or a combination.")
if not result.integrity:
Expand Down
48 changes: 48 additions & 0 deletions python/picoquake/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from itertools import permutations
from collections import deque
from typing import List, Any, Optional


def get_axis_combinations(axis: str) -> set:
return set(''.join(p) for i in range(1, len(axis) + 1) for p in permutations(axis, i))


def deque_get_last_n(data: deque, n: int) -> List[Any]:
"""
Get the last n elements from a deque.
Args:
data: Deque with data.
n: Number of elements to get.
Returns:
List with the last n elements from the deque. If n is greater than the length of the deque, all elements are returned.
"""
start_idx = max(0, len(data) - n)
return [data[i] for i in range(start_idx, len(data))]

def deque_slice(dq: deque, start: Optional[int], end: Optional[int] = None) -> List[Any]:
"""
Return a slice from the deque.
Args:
dq: The deque to slice.
start: The starting index of the slice.
end : The ending index of the slice.
Returns:
deque: A deque containing the specified slice.
"""
if start is None:
start = 0
if end is None:
end = len(dq)
if start < 0:
start = len(dq) + start
if end < 0:
end = len(dq) + end
if start < 0:
start = 0
if end > len(dq):
end = len(dq)
return [dq[i] for i in range(start, end)]
8 changes: 7 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "picoquake"
version = "1.0.4"
version = "1.1.0-beta"
description = "PicoQuake USB vibration sensor library."
requires-python = ">=3.9"
readme = "README.md"
Expand Down Expand Up @@ -41,6 +41,12 @@ plot = [
"scipy~=1.6",
"matplotlib~=3.4"
]
test = [
"numpy~=1.19",
"scipy~=1.6",
"matplotlib~=3.4",
"pytest~=6.2"
]

[project.urls]
Repository = "https://github.com/PLab-SI/PicoQuake"
Expand Down
32 changes: 32 additions & 0 deletions python/test/test_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pytest import approx

from picoquake.data import IMUSample
from picoquake.analisys import *

def test_rms():
samples = [
IMUSample(0, 1, 2, 3, 4, 5, 6),
IMUSample(1, 1, 2, 3, 4, 5, 6),
IMUSample(2, 1, 2, 3, 4, 5, 6),
]

# Test for all axes
acc_rms, gyro_rms = rms(samples, 'xyz')
assert approx(acc_rms, 1e-3) == 3.7417
assert approx(gyro_rms, 1e-3) == 8.775

# Test for x axis only
acc_rms, gyro_rms = rms(samples, 'x')
assert approx(acc_rms, 1e-3) == 1.0
assert approx(gyro_rms, 1e-3) == 4.0

# Test for y axis only
acc_rms, gyro_rms = rms(samples, 'y')
assert approx(acc_rms, 1e-3) == 2.0
assert approx(gyro_rms, 1e-3) == 5.0

# Test for z axis only
acc_rms, gyro_rms = rms(samples, 'z')
assert approx(acc_rms, 1e-3) == 3.0
assert approx(gyro_rms, 1e-3) == 6.0

36 changes: 36 additions & 0 deletions python/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from picoquake.utils import *


def test_get_axis_combinations():
assert get_axis_combinations("x") == {"x"}
assert get_axis_combinations("xyz") == {"x", "y", "z",
"xy", "xz", "yx", "yz", "zx", "zy",
"xyz", "xzy", "yxz", "yzx", "zxy", "zyx"}
assert get_axis_combinations("abcd") == {"a", "b", "c", "d",
"ab", "ac", "ad", "ba", "bc", "bd", "ca", "cb", "cd", "da", "db", "dc",
"abc", "abd", "acb", "acd", "adb", "adc", "bac", "bad", "bca", "bcd", "bda", "bdc",
"cab", "cad", "cba", "cbd", "cda", "cdb", "dab", "dac", "dba", "dbc", "dca", "dcb",
"abcd", "abdc", "acbd", "acdb", "adbc", "adcb", "bacd", "badc", "bcad", "bcda", "bdac", "bdca",
"cabd", "cadb", "cbad", "cbda", "cdab", "cdba", "dabc", "dacb", "dbac", "dbca", "dcab", "dcba"}


def test_deque_get_last_n():
d = deque([1, 2, 3, 4, 5])
assert deque_get_last_n(d, 3) == [3, 4, 5]
assert deque_get_last_n(d, 5) == [1, 2, 3, 4, 5]
assert deque_get_last_n(d, 6) == [1, 2, 3, 4, 5]
assert deque_get_last_n(d, 0) == []
assert deque_get_last_n(d, -1) == []
d = deque()
assert deque_get_last_n(d, 3) == []
assert deque_get_last_n(d, 0) == []
assert deque_get_last_n(d, -1) == []


def test_deque_slice():
dq = deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
test_slices = [(None, None), (None, 3), (0, 3), (0, 10), (3, 7),
(None, -1), (-5, None), (-5, -3), (0, -3), (3, -3), (0, -10)]
for start, end in test_slices:
assert deque_slice(dq, start, end) == list(dq)[start:end]

0 comments on commit a388c3b

Please sign in to comment.