-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbci_attentiontracker_new_neurofeedback.py
106 lines (85 loc) · 4.27 KB
/
bci_attentiontracker_new_neurofeedback.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
"""
Estimate Relaxation from Band Powers
This example shows how to buffer, epoch, and transform EEG data from a single
electrode into values for each of the classic frequencies (e.g. alpha, beta, theta)
Furthermore, it shows how ratios of the band powers can be used to estimate
mental state for neurofeedback.
"""
import numpy as np # Module that simplifies computations on matrices
import matplotlib.pyplot as plt # Module used for plotting
from pylsl import StreamInlet, resolve_byprop # Module to receive EEG data
import utils # Our own utility functions
import pyautogui
from pynput.keyboard import Key, Controller
from datetime import datetime
from win11toast import toast
import time
# Handy little enum to make code more readable
class Band:
Delta = 0
Theta = 1
Alpha = 2
Beta = 3
""" EXPERIMENTAL PARAMETERS """
# Modify these to change aspects of the signal processing
BUFFER_LENGTH = 5 # EEG data buffer length in seconds
EPOCH_LENGTH = 1 # Epoch length used for FFT in seconds
OVERLAP_LENGTH = 0 # Amount of overlap between two consecutive epochs in seconds
SHIFT_LENGTH = EPOCH_LENGTH - OVERLAP_LENGTH # How much to 'shift' between consecutive epochs
INDEX_CHANNEL = [0] # Index of the channel(s) to be used
if __name__ == "__main__":
""" 1. CONNECT TO EEG STREAM """
print('Looking for an EEG stream...')
streams = resolve_byprop('type', 'EEG', timeout=2)
if len(streams) == 0:
raise RuntimeError('Can\'t find EEG stream.')
print("Start acquiring data")
inlet = StreamInlet(streams[0], max_chunklen=12)
eeg_time_correction = inlet.time_correction()
info = inlet.info()
fs = int(info.nominal_srate())
""" 2. INITIALIZE BUFFERS """
eeg_buffer = np.zeros((int(fs * BUFFER_LENGTH), 1))
filter_state = None
n_win_test = int(np.floor((BUFFER_LENGTH - EPOCH_LENGTH) / SHIFT_LENGTH + 1))
band_buffer = np.zeros((n_win_test, 4))
""" 3. GET DATA """
print('Press Ctrl-C in the console to break the while loop.')
t = datetime.now()
focus_start_time = None # To track when focus starts
focus_time = 0 # Track how long focus has lasted in seconds
try:
keyboard = Controller()
while True:
""" 3.1 ACQUIRE DATA """
eeg_data, timestamp = inlet.pull_chunk(
timeout=1, max_samples=int(SHIFT_LENGTH * fs))
if len(eeg_data) == 0:
continue # Skip this iteration if no data was received
ch_data = np.array(eeg_data)[:, INDEX_CHANNEL]
# Update EEG buffer with the new data
eeg_buffer, filter_state = utils.update_buffer(
eeg_buffer, ch_data, notch=True, filter_state=filter_state)
""" 3.2 COMPUTE BAND POWERS """
data_epoch = utils.get_last_data(eeg_buffer, EPOCH_LENGTH * fs)
band_powers = utils.compute_band_powers(data_epoch, fs)
band_buffer, _ = utils.update_buffer(band_buffer, np.asarray([band_powers]))
smooth_band_powers = np.mean(band_buffer, axis=0)
#Assigning the variable to the theta/beta
theta_beta_ratio = smooth_band_powers[Band.Theta] / smooth_band_powers[Band.Beta]
#Printing it's value
print('Theta/Beta Ratio: ', theta_beta_ratio)
#Track focus time when theta_beta_ratio < 0.95
if theta_beta_ratio < 0.95: # Change this number based on your theta/beta ratio when you're concentrated vs. distracted
if focus_start_time is None:
focus_start_time = time.time() # Start tracking time when focus begins
focus_time = time.time() - focus_start_time # Calculate time spent focused
if theta_beta_ratio > 0.95 and (time.time() - t.timestamp()) > 10:
print("----------- CONCENTRATION CHANGE -----------")
toast(f"You're distracted. You've been focused for {focus_time:.2f} seconds. This is a friendly reminder to FOCUS.")
focus_start_time = None # Reset focus time after notification
focus_time = 0 # Reset focus time after concentration change
t = datetime.now() # Reset time after distraction
except KeyboardInterrupt:
print('Closing!')