diff --git a/eegnb/devices/__init__.py b/eegnb/devices/__init__.py index e69de29bb..955522374 100644 --- a/eegnb/devices/__init__.py +++ b/eegnb/devices/__init__.py @@ -0,0 +1,5 @@ +from .base import EEGDevice +from .muse import MuseDevice +from ._brainflow import BrainflowDevice + +all_devices = MuseDevice.devices + BrainflowDevice.devices diff --git a/eegnb/devices/_brainflow.py b/eegnb/devices/_brainflow.py new file mode 100644 index 000000000..fe3665e9a --- /dev/null +++ b/eegnb/devices/_brainflow.py @@ -0,0 +1,221 @@ +import logging +from time import sleep +from multiprocessing import Process +from typing import List, Tuple + +import numpy as np +import pandas as pd + +from brainflow import BoardShim, BoardIds, BrainFlowInputParams +from .base import EEGDevice, _check_samples + + +logger = logging.getLogger(__name__) + + +class BrainflowDevice(EEGDevice): + # list of brainflow devices + devices: List[str] = [ + "ganglion", + "ganglion_wifi", + "cyton", + "cyton_wifi", + "cyton_daisy", + "cyton_daisy_wifi", + "brainbit", + "unicorn", + "synthetic", + "brainbit", + "notion1", + "notion2", + ] + + def __init__( + self, + device_name: str, + serial_num=None, + serial_port=None, + mac_addr=None, + other=None, + ip_addr=None, + ): + EEGDevice.__init__(self, device_name) + self.serial_num = serial_num + self.serial_port = serial_port + self.mac_address = mac_addr + self.other = other + self.ip_addr = ip_addr + self.markers: List[Tuple[List[int], float]] = [] + self._init_brainflow() + + def start(self, filename: str = None, duration=None) -> None: + self.save_fn = filename + + def record(): + sleep(duration) + self._stop_brainflow() + + self.board.start_stream() + if duration: + logger.info( + "Starting background recording process, will save to file: %s" + % self.save_fn + ) + self.recording = Process(target=lambda: record()) + self.recording.start() + + def stop(self) -> None: + self._stop_brainflow() + + def push_sample(self, marker: List[int], timestamp: float): + last_timestamp = self.board.get_current_board_data(1)[-1][0] + self.markers.append((marker, last_timestamp)) + + def check(self, max_uv_abs=200) -> List[str]: + data = self.board.get_board_data() # will clear board buffer + # print(data) + channel_names = BoardShim.get_eeg_names(self.brainflow_id) + # FIXME: _check_samples expects different (Muse) inputs + checked = _check_samples(data.T, channel_names, max_uv_abs=max_uv_abs) # type: ignore + bads = [ch for ch, ok in checked.items() if not ok] + return bads + + def _init_brainflow(self) -> None: + """ + This function initializes the brainflow backend based on the input device name. It calls + a utility function to determine the appropriate USB port to use based on the current operating system. + Additionally, the system allows for passing a serial number in the case that they want to use either + the BrainBit or the Unicorn EEG devices from the brainflow family. + + Parameters: + serial_num (str or int): serial number for either the BrainBit or Unicorn devices. + """ + from eegnb.devices.utils import get_openbci_usb + + # Initialize brainflow parameters + self.brainflow_params = BrainFlowInputParams() + + device_name_to_id = { + "ganglion": BoardIds.GANGLION_BOARD.value, + "ganglion_wifi": BoardIds.GANGLION_WIFI_BOARD.value, + "cyton": BoardIds.CYTON_BOARD.value, + "cyton_wifi": BoardIds.CYTON_WIFI_BOARD.value, + "cyton_daisy": BoardIds.CYTON_DAISY_BOARD.value, + "cyton_daisy_wifi": BoardIds.CYTON_DAISY_WIFI_BOARD.value, + "brainbit": BoardIds.BRAINBIT_BOARD.value, + "unicorn": BoardIds.UNICORN_BOARD.value, + "callibri_eeg": BoardIds.CALLIBRI_EEG_BOARD.value, + "notion1": BoardIds.NOTION_1_BOARD.value, + "notion2": BoardIds.NOTION_2_BOARD.value, + "synthetic": BoardIds.SYNTHETIC_BOARD.value, + } + + # validate mapping + assert all(name in device_name_to_id for name in self.devices) + + self.brainflow_id = device_name_to_id[self.device_name] + + if self.device_name == "ganglion": + if self.serial_port is None: + self.brainflow_params.serial_port = get_openbci_usb() + # set mac address parameter in case + if self.mac_address is None: + logger.info( + "No MAC address provided, attempting to connect without one" + ) + else: + self.brainflow_params.mac_address = self.mac_address + + elif self.device_name in ["ganglion_wifi", "cyton_wifi", "cyton_daisy_wifi"]: + if self.ip_addr is not None: + self.brainflow_params.ip_address = self.ip_addr + + elif self.device_name in ["cyton", "cyton_daisy"]: + if self.serial_port is None: + self.brainflow_params.serial_port = get_openbci_usb() + + elif self.device_name == "callibri_eeg": + if self.other: + self.brainflow_params.other_info = str(self.other) + + # some devices allow for an optional serial number parameter for better connection + if self.serial_num: + self.brainflow_params.serial_number = str(self.serial_num) + + if self.serial_port: + self.brainflow_params.serial_port = str(self.serial_port) + + # Initialize board_shim + self.sfreq = BoardShim.get_sampling_rate(self.brainflow_id) + self.board = BoardShim(self.brainflow_id, self.brainflow_params) + self.board.prepare_session() + + def get_data(self) -> pd.DataFrame: + from eegnb.devices.utils import create_stim_array + + data = self.board.get_board_data() # will clear board buffer + + # transform data for saving + data = data.T # transpose data + print(data) + + # get the channel names for EEG data + if self.brainflow_id == BoardIds.GANGLION_BOARD.value: + # if a ganglion is used, use recommended default EEG channel names + ch_names = ["fp1", "fp2", "tp7", "tp8"] + else: + # otherwise select eeg channel names via brainflow API + ch_names = BoardShim.get_eeg_names(self.brainflow_id) + + # pull EEG channel data via brainflow API + eeg_data = data[:, BoardShim.get_eeg_channels(self.brainflow_id)] + timestamps = data[:, BoardShim.get_timestamp_channel(self.brainflow_id)] + + # Create a column for the stimuli to append to the EEG data + stim_array = create_stim_array(timestamps, self.markers) + timestamps = timestamps[ + ..., None + ] # Add an additional dimension so that shapes match + total_data = np.append(timestamps, eeg_data, 1) + total_data = np.append( + total_data, stim_array, 1 + ) # Append the stim array to data. + + # Subtract five seconds of settling time from beginning + # total_data = total_data[5 * self.sfreq :] + df = pd.DataFrame(total_data, columns=["timestamps"] + ch_names + ["stim"]) + return df + + def _save(self) -> None: + """Saves the data to a CSV file.""" + assert self.save_fn + df = self.get_data() + df.to_csv(self.save_fn, index=False) + + def _stop_brainflow(self) -> None: + """This functions kills the brainflow backend and saves the data to a CSV file.""" + # Collect session data and kill session + if self.save_fn: + self._save() + self.board.stop_stream() + self.board.release_session() + + +def test_check(): + device = BrainflowDevice(device_name="synthetic") + with device: + sleep(2) # is 2s really needed? + bads = device.check(max_uv_abs=300) + # Seems to blink between the two... + assert bads == ["F6", "F8"] or bads == ["F4", "F6", "F8"] + # print(bads) + # assert not bads + + +def test_get_data(): + device = BrainflowDevice(device_name="synthetic") + with device: + sleep(2) + df = device.get_data() + print(df) + assert not df.empty diff --git a/eegnb/devices/base.py b/eegnb/devices/base.py new file mode 100644 index 000000000..f8ee8e775 --- /dev/null +++ b/eegnb/devices/base.py @@ -0,0 +1,93 @@ +""" +Abstraction for the various supported EEG devices. +""" + +import logging +from typing import List, Dict +from abc import ABCMeta, abstractmethod + +import numpy as np + + +logger = logging.getLogger(__name__) + + +def _check_samples( + buffer: np.ndarray, channels: List[str], max_uv_abs=200 +) -> Dict[str, bool]: + # TODO: Better signal quality check + chmax = dict(zip(channels, np.max(np.abs(buffer), axis=0))) + return {ch: maxval < max_uv_abs for ch, maxval in chmax.items()} + + +def test_check_samples(): + buffer = np.array([[9.0, 11.0, -5, -13]]) + assert {"TP9": True, "AF7": False, "AF8": True, "TP10": False} == _check_samples( + buffer, channels=["TP9", "AF7", "AF8", "TP10"], max_uv_abs=10 + ) + + +class EEGDevice(metaclass=ABCMeta): + def __init__(self, device: str) -> None: + """ + The initialization function takes the name of the EEG device and initializes the appropriate backend. + + Parameters: + device (str): name of eeg device used for reading data. + """ + self.device_name = device + + @classmethod + def create(cls, device_name: str, *args, **kwargs) -> "EEGDevice": + from .muse import MuseDevice + from ._brainflow import BrainflowDevice + + if device_name in BrainflowDevice.devices: + return BrainflowDevice(device_name) + elif device_name in MuseDevice.devices: + return MuseDevice(device_name) + else: + raise ValueError(f"Invalid device name: {device_name}") + + def __enter__(self): + self.start() + + def __exit__(self, *args): + self.stop() + + @abstractmethod + def start(self, filename: str = None, duration=None): + """ + Starts the EEG device based on the defined backend. + + Parameters: + filename (str): name of the file to save the sessions data to. + """ + raise NotImplementedError + + @abstractmethod + def stop(self): + raise NotImplementedError + + @abstractmethod + def push_sample(self, marker: List[int], timestamp: float): + """ + Push a marker and its timestamp to store alongside the EEG data. + + Parameters: + marker (int): marker number for the stimuli being presented. + timestamp (float): timestamp of stimulus onset from time.time() function. + """ + raise NotImplementedError + + def get_samples(self): + raise NotImplementedError + + @abstractmethod + def check(self): + raise NotImplementedError + + +def test_create(): + device = EEGDevice.create("synthetic") + assert device diff --git a/eegnb/devices/eeg.py b/eegnb/devices/eeg.py deleted file mode 100644 index 28b34ace4..000000000 --- a/eegnb/devices/eeg.py +++ /dev/null @@ -1,297 +0,0 @@ -""" Abstraction for the various supported EEG devices. - - 1. Determine which backend to use for the board. - 2. - -""" - -import os, sys - -import time -from time import sleep -from multiprocessing import Process - -import numpy as np -import pandas as pd - -from brainflow import BoardShim, BoardIds, BrainFlowInputParams -from muselsl import stream, list_muses, record -from pylsl import StreamInfo, StreamOutlet - -from eegnb.devices.utils import get_openbci_usb, create_stim_array - -# list of brainflow devices -brainflow_devices = [ - "ganglion", - "ganglion_wifi", - "cyton", - "cyton_wifi", - "cyton_daisy", - "cyton_daisy_wifi", - "brainbit", - "unicorn", - "synthetic", - "brainbit", - "notion1", - "notion2", - "freeeeg32", -] - - -class EEG: - def __init__( - self, - device=None, - serial_port=None, - serial_num=None, - mac_addr=None, - other=None, - ip_addr=None, - ): - """The initialization function takes the name of the EEG device and determines whether or not - the device belongs to the Muse or Brainflow families and initializes the appropriate backend. - - Parameters: - device (str): name of eeg device used for reading data. - """ - # determine if board uses brainflow or muselsl backend - self.device_name = device - self.serial_num = serial_num - self.serial_port = serial_port - self.mac_address = mac_addr - self.ip_addr = ip_addr - self.other = other - self.backend = self._get_backend(self.device_name) - self.initialize_backend() - - def initialize_backend(self): - if self.backend == "brainflow": - self._init_brainflow() - elif self.backend == "muselsl": - self._init_muselsl() - - def _get_backend(self, device_name): - if device_name in brainflow_devices: - return "brainflow" - elif device_name in ["muse2016", "muse2", "museS"]: - return "muselsl" - - ##################### - # MUSE functions # - ##################### - def _init_muselsl(self): - # Currently there's nothing we need to do here. However keeping the - # option open to add things with this init method. - pass - - def _start_muse(self, duration): - - if sys.platform in ["linux", "linux2", "darwin"]: - # Look for muses - self.muses = list_muses() - # self.muse = muses[0] - - # Start streaming process - self.stream_process = Process( - target=stream, args=(self.muses[0]["address"],) - ) - self.stream_process.start() - - # Create markers stream outlet - self.muse_StreamInfo = StreamInfo( - "Markers", "Markers", 1, 0, "int32", "myuidw43536" - ) - self.muse_StreamOutlet = StreamOutlet(self.muse_StreamInfo) - - # Start a background process that will stream data from the first available Muse - print("starting background recording process") - print("will save to file: %s" % self.save_fn) - self.recording = Process(target=record, args=(duration, self.save_fn)) - self.recording.start() - - time.sleep(5) - - self.push_sample([99], timestamp=time.time()) - - def _stop_muse(self): - - pass - - def _muse_push_sample(self, marker, timestamp): - self.muse_StreamOutlet.push_sample(marker, timestamp) - - ########################## - # BrainFlow functions # - ########################## - def _init_brainflow(self): - """This function initializes the brainflow backend based on the input device name. It calls - a utility function to determine the appropriate USB port to use based on the current operating system. - Additionally, the system allows for passing a serial number in the case that they want to use either - the BraintBit or the Unicorn EEG devices from the brainflow family. - - Parameters: - serial_num (str or int): serial number for either the BrainBit or Unicorn devices. - """ - # Initialize brainflow parameters - self.brainflow_params = BrainFlowInputParams() - - if self.device_name == "ganglion": - self.brainflow_id = BoardIds.GANGLION_BOARD.value - if self.serial_port == None: - self.brainflow_params.serial_port = get_openbci_usb() - # set mac address parameter in case - if self.mac_address is None: - print("No MAC address provided, attempting to connect without one") - else: - self.brainflow_params.mac_address = self.mac_address - - elif self.device_name == "ganglion_wifi": - self.brainflow_id = BoardIds.GANGLION_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - self.brainflow_params.ip_port = 6677 - - elif self.device_name == "cyton": - self.brainflow_id = BoardIds.CYTON_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "cyton_wifi": - self.brainflow_id = BoardIds.CYTON_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - self.brainflow_params.ip_port = 6677 - - elif self.device_name == "cyton_daisy": - self.brainflow_id = BoardIds.CYTON_DAISY_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "cyton_daisy_wifi": - self.brainflow_id = BoardIds.CYTON_DAISY_WIFI_BOARD.value - if self.ip_addr is not None: - self.brainflow_params.ip_address = self.ip_addr - - elif self.device_name == "brainbit": - self.brainflow_id = BoardIds.BRAINBIT_BOARD.value - - elif self.device_name == "unicorn": - self.brainflow_id = BoardIds.UNICORN_BOARD.value - - elif self.device_name == "callibri_eeg": - self.brainflow_id = BoardIds.CALLIBRI_EEG_BOARD.value - if self.other: - self.brainflow_params.other_info = str(self.other) - - elif self.device_name == "notion1": - self.brainflow_id = BoardIds.NOTION_1_BOARD.value - - elif self.device_name == "notion2": - self.brainflow_id = BoardIds.NOTION_2_BOARD.value - - elif self.device_name == "freeeeg32": - self.brainflow_id = BoardIds.FREEEEG32_BOARD.value - if self.serial_port is None: - self.brainflow_params.serial_port = get_openbci_usb() - - elif self.device_name == "synthetic": - self.brainflow_id = BoardIds.SYNTHETIC_BOARD.value - - # some devices allow for an optional serial number parameter for better connection - if self.serial_num: - serial_num = str(self.serial_num) - self.brainflow_params.serial_number = serial_num - - if self.serial_port: - serial_port = str(self.serial_port) - self.brainflow_params.serial_port = serial_port - - # Initialize board_shim - self.sfreq = BoardShim.get_sampling_rate(self.brainflow_id) - self.board = BoardShim(self.brainflow_id, self.brainflow_params) - self.board.prepare_session() - - def _start_brainflow(self): - self.board.start_stream() - # wait for signal to settle - sleep(5) - - def _stop_brainflow(self): - """This functions kills the brainflow backend and saves the data to a CSV file.""" - - # Collect session data and kill session - data = self.board.get_board_data() # will clear board buffer - self.board.stop_stream() - self.board.release_session() - - # transform data for saving - data = data.T # transpose data - - # get the channel names for EEG data - if ( - self.brainflow_id == BoardIds.GANGLION_BOARD.value - or self.brainflow_id == BoardIds.GANGLION_WIFI_BOARD.value - ): - # if a ganglion is used, use recommended default EEG channel names - ch_names = ["fp1", "fp2", "tp7", "tp8"] - elif (self.brainflow_id == BoardIds.FREEEEG32_BOARD.value): - ch_names = [f'eeg_{i}' for i in range(0,32)] - else: - # otherwise select eeg channel names via brainflow API - ch_names = BoardShim.get_eeg_names(self.brainflow_id) - - # pull EEG channel data via brainflow API - eeg_data = data[:, BoardShim.get_eeg_channels(self.brainflow_id)] - timestamps = data[:, BoardShim.get_timestamp_channel(self.brainflow_id)] - - # Create a column for the stimuli to append to the EEG data - stim_array = create_stim_array(timestamps, self.markers) - timestamps = timestamps[ - ..., None - ] # Add an additional dimension so that shapes match - total_data = np.append(timestamps, eeg_data, 1) - total_data = np.append( - total_data, stim_array, 1 - ) # Append the stim array to data. - - # Subtract five seconds of settling time from beginning - total_data = total_data[5 * self.sfreq :] - data_df = pd.DataFrame(total_data, columns=["timestamps"] + ch_names + ["stim"]) - data_df.to_csv(self.save_fn, index=False) - - def _brainflow_push_sample(self, marker): - last_timestamp = self.board.get_current_board_data(1)[-1][0] - self.markers.append([marker, last_timestamp]) - - def start(self, fn, duration=None): - """Starts the EEG device based on the defined backend. - - Parameters: - fn (str): name of the file to save the sessions data to. - """ - if fn: - self.save_fn = fn - - if self.backend == "brainflow": # Start brainflow backend - self._start_brainflow() - self.markers = [] - elif self.backend == "muselsl": - self._start_muse(duration) - - def push_sample(self, marker, timestamp): - """Universal method for pushing a marker and its timestamp to store alongside the EEG data. - - Parameters: - marker (int): marker number for the stimuli being presented. - timestamp (float): timestamp of stimulus onset from time.time() function. - """ - if self.backend == "brainflow": - self._brainflow_push_sample(marker=marker) - elif self.backend == "muselsl": - self._muse_push_sample(marker=marker, timestamp=timestamp) - - def stop(self): - if self.backend == "brainflow": - self._stop_brainflow() - elif self.backend == "muselsl": - pass diff --git a/eegnb/devices/muse.py b/eegnb/devices/muse.py new file mode 100644 index 000000000..294e04abc --- /dev/null +++ b/eegnb/devices/muse.py @@ -0,0 +1,133 @@ +import sys +import logging +from time import time, sleep +from multiprocessing import Process +from typing import List, Optional + +import numpy as np +import muselsl +import pylsl + +from .base import EEGDevice, _check_samples + +logger = logging.getLogger(__name__) + +BACKEND = "bleak" +CHANNELS_MUSE = ["TP9", "AF7", "AF8", "TP10"] + + +def stream(address, sources): + muselsl.stream( + address, + backend=BACKEND, + ppg_enabled="PPG" in sources, + acc_enabled="ACC" in sources, + gyro_enabled="GYRO" in sources, + ) + + +def record(duration, filename, data_source="EEG"): + muselsl.record(duration=duration, filename=filename, data_source=data_source) + + +class MuseDevice(EEGDevice): + # list of muse devices + devices = [ + "muse2016", + "muse2", + "museS", + ] + + def __init__(self, device_name: str): + EEGDevice.__init__(self, device_name) + self.stream_process: Optional[Process] = None + + @property + def started(self) -> bool: + if self.stream_process: + return self.stream_process.exitcode is None + return False + + def start(self, filename: str = None, duration=None): + """ + Starts the EEG device. + + Parameters: + filename (str): name of the file to save the sessions data to. + """ + sources = ["EEG"] # + ["PPG", "ACC", "GYRO"] + if not duration: + duration = 300 + + # Not sure why we only do this on *nix + # Makes it seem like streaming is only supported on *nix? + if not self.started and sys.platform in ["linux", "linux2", "darwin"]: + # Look for muses + muses = muselsl.list_muses(backend=BACKEND) + # FIXME: fix upstream + muses = [m for m in muses if m["name"].startswith("Muse")] + if not muses: + raise Exception("No Muses found") + + # self.muse = muses[0] + + # Start streaming process + # daemon=False to ensure orderly shutdown/disconnection + stream_process = Process( + target=stream, args=(muses[0]["address"], sources), daemon=False + ) + stream_process.start() + self.stream_process = stream_process + + # Create markers stream outlet + self.marker_outlet = pylsl.StreamOutlet( + pylsl.StreamInfo("Markers", "Markers", 1, 0, "int32", "myuidw43536") + ) + + self.record(sources, duration, filename) + + # FIXME: What's the purpose of this? (Push sample indicating recording start?) + self.push_sample([99], timestamp=time()) + + def record(self, sources: List[str], duration, filename): + # Start a background process that will stream data from the first available Muse + for source in sources: + logger.info("Starting background recording process") + rec_process = Process( + target=record, args=(duration, filename, source), daemon=True + ) + rec_process.start() + + def stop(self): + pass + + def push_sample(self, marker: List[int], timestamp: float): + self.marker_outlet.push_sample(marker, timestamp) + + def _read_buffer(self) -> np.ndarray: + from eegwatch.lslutils import _get_inlets + + inlets = _get_inlets(verbose=False) + + for i in range(5): + for inlet in inlets: + inlet.pull(timeout=0.5) # type: ignore + inlets = [inlet for inlet in inlets if inlet.buffer.any()] # type: ignore + if inlets: + break + else: + logger.info("No inlets with data, trying again in a second...") + sleep(1) + + if not inlets: + raise Exception("No inlets found") + + inlet = inlets[0] + return inlet.buffer # type: ignore + + def check(self) -> List[str]: + checked = _check_samples( + self._read_buffer(), channels=["TP9", "AF7", "AF8", "TP10"] + ) + bads = [ch for ch, ok in checked.items() if not ok] + return bads