-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlow_pass_filter.py
83 lines (68 loc) · 2.8 KB
/
low_pass_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class LowPassFilter:
def __init__(self, alpha: float = 0.6):
"""
Initialize the LowPassFilter.
:param alpha: Smoothing factor (0 < alpha < 1), determines the weight of new vs old values.
"""
if not (0 < alpha <= 1):
raise ValueError("Alpha must be between 0 and 1.")
self.alpha = alpha
self._current_value = None
def add(self, value: float):
"""
Add a new measurement and update the smoothed value.
:param value: The new measurement to add.
"""
if value is not None:
if self._current_value is None:
self._current_value = value
else:
self._current_value = self.alpha * value + (1 - self.alpha) * self._current_value
@property
def value(self):
"""
Return the current smoothed value.
:return: The smoothed value, or None if no values have been added yet.
"""
return round(self._current_value, 1) if self._current_value is not None else None
# class LowPassFilter:
# def __init__(self, size = 10):
# self._values = deque(maxlen=size)
# def add(self, value):
# self._values.append(value)
# @property
# def value(self):
# return round(sum(self._values) / len(self._values), 1)
# class LowPassFilter:
# def __init__(self, cutoff=0.005, sampling_interval=20, order=2):
# """
# Initialize the low-pass filter with the given parameters.
# :param cutoff: Cutoff frequency in Hz (default 0.005 Hz)
# :param sampling_interval: Time between readings in seconds (default 20 seconds)
# :param order: Order of the Butterworth filter (default 2)
# """
# self._cutoff = cutoff
# self._fs = 1 / sampling_interval # Sampling frequency in Hz
# self._order = order
# self._values = []
# self._b, self._a = self._design_filter()
# def _design_filter(self):
# """Design the Butterworth filter coefficients."""
# nyquist = 0.5 * self._fs
# normal_cutoff = self._cutoff / nyquist
# b, a = signal.butter(self._order, normal_cutoff, btype='low', analog=False)
# return b, a
# def add(self, new_reading):
# """
# Add a new temperature reading and return the filtered value.
# :param new_reading: New temperature reading (float)
# :return: Filtered temperature value (float)
# """
# self._values.append(new_reading)
# filtered_value = signal.lfilter(self._b, self._a, self._values)[-1]
# return filtered_value
# @property
# def value(self):
# """Return the entire list of filtered readings."""
# if self._values is not None and self._values[0] is not None:
# return self._values[0]