import numpy as np
import pandas as pd
import patsy
import mne
from mne.epochs import EpochsArray
from collections import OrderedDict
from spudtr.epf import _epochs_QC
from spudtr import RESOURCES_DIR
import yaml
# RESOURCES_DIR
# default
EEG_LOCATIONS_F = RESOURCES_DIR / "mne_32chan_xyz_spherical.yml"
def _streams2mne_digmont(eeg_streams, eeg_locations_f):
"""Parameters
------------
eeg_streams : list of str
column names of the data streams
eeg_locations_f : path and file of mne_32chan_xyz_spherical.yml
Examples
--------
eg.1
montage = mneutils._streams2mne_digmont(eeg_streams, eeg_locations_f)
montage.plot(kind='topomap', show_names=True);
"""
with open(eeg_locations_f, "r") as stream:
mne_32chan = yaml.safe_load(stream)
mne_streams = mne_32chan["sensors"]
eeg_locs = list(mne_streams.keys())
missing_streams = set(eeg_streams) - set(eeg_locs)
if missing_streams:
raise ValueError(f"eeg_streams not found in cap: {missing_streams}")
ch_names = []
pos = []
for key in eeg_streams:
if key in mne_streams:
ch_names.append(key)
val = mne_streams[key]
pos.append(list(val.values()))
dig_ch_pos = OrderedDict(zip(ch_names, np.array(pos)))
fiducials = mne_32chan["fiducials"]
lpa = np.array(list(fiducials["lpa"].values()))
rpa = np.array(list(fiducials["rpa"].values()))
nasion = np.array(list(fiducials["nasion"].values()))
montage = mne.channels.make_dig_montage(
nasion=nasion, lpa=lpa, rpa=rpa, ch_pos=dig_ch_pos, coord_frame="head"
)
return montage
[docs]def categories2eventid(epochs_df, categories, epoch_id, time, time_stamp):
"""Build an MNE events array and event_id dict from one or more categorical variables.
This uses patsy formulas and dummy coded (full rank) design
matrixes to construct the MNE format event_id dictionary and
corresponding events array (events x 3) for tagging and binning
single-trial epochs for time-domain aggregation into
``mne.Evoked``, e.g., average event-related potentials (ERPs).
A single category is split into the category levels, a.k.a conditions, bins,
like so: ``~ 0 + a``.
Multiple categories fully crossed like so: ``~ 0 + a:b`` and ``~ 0 + a:b:c``
Parameters
----------
epochs_df : pandas.DataFrame
A spudtr format epochs data with ``epoch_id``, ``time`` columns.
categories : str or iterable of str
The column name(s) of the categorical variables.
epoch_id : str
The name of the column with the unique epoch ids, e.g.,
``epoch_id``, ``Epoch_idx``.
time : str
The name of the column with the regular epoch time stamps, e.g., ``time``,
``time_ms``, ``time_s``.
time_stamp : int The time stamp in the epoch to look up the
categorical variable values, e.g., ``0``
Returns
-------
mne_event_id : dict
An MNE Python event_id dictionary where each item is ``label:
event_code``. The ``label`` is the column name from the patsy
full-rank design matrix (incidence matrix) for the categories
(thank you NJS). The ``event_code`` is the 1-based column index
in the design matrix.
mne_events : np.array, shape=(number_of_epochs, 3) there is one
row for each epoch in ``epochs_df``. Each row is
``[epoch_id, 0, mne_event_code]``
where ``mne_event_code`` is the newly
constructed event code derived from the ``patsy`` design matrix
column
Examples
--------
Suppose at the specified time stamp the epochs_df categorical
columns ``a`` and ``b`` have have the following levels: ``a: a1,
a2``, ``b: b1, b2, b3``
>>> categories2eventid(epochs_df, categories="a", epoch_id, time, time_stamp)
event_ids = {
"a[a1]": 1,
"a[a2]": 2
}
>>> categories2eventid(epochs_df, categories="b", epoch_id, time, time_stamp)
event_ids = {
"b[b1]": 1,
"b[b2]": 2,
"b[b3]": 3
}
>>> categories2eventid(epochs_df, categories=["a", "b"], epoch_id, time, time_stamp)
event_ids = {
'a[a1]:b[b1]': 1,
'a[a2]:b[b1]': 2,
'a[a1]:b[b2]': 3,
'a[a2]:b[b2]': 4,
'a[a1]:b[b3]': 5,
'a[a2]:b[b3]': 6
}
"""
# modicum of guarding
if isinstance(categories, str):
categories = [categories]
# check spudtr epochs format
_ = _epochs_QC(epochs_df, categories, epoch_id=epoch_id, time=time)
if time_stamp not in epochs_df[time].unique():
raise ValueError(f"time_stamp {time_stamp} not found in epochs_df['{time}']")
# slice the epoch row at the specified time_stamp, e.g., time==0
# the category columns at this row are used to build the new
# event_id dictionary
events_df = epochs_df[epochs_df[time] == time_stamp].copy()
for cat in categories:
events_df[cat] = pd.Categorical(events_df[cat])
# ensure dm is a full rank indicator matrix n columns = product of
# factor levels w/ exactly one 1 in each row
formula = "~ 0 + " + ":".join(categories)
dm = patsy.dmatrix(formula, events_df)
assert all(np.equal(1, [len(a) for a in map(lambda x: np.where(x == 1)[0], dm)]))
dm_cols = dm.design_info.column_names
# convert indidcator design matrix to a 1-base vector that indexes
# which column of dm has the indicator 1 via binary summation
# e.g., dm = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] -> [1, 2, 3]
dm_col_code = np.array(
[np.where(dm[i, :] == 1)[0] + 1 for i in range(len(dm))]
).squeeze()
assert dm_col_code.min() == 1
assert dm_col_code.max() == dm.shape[1]
# 1-base mne event code dict with column labels from patsy
mne_event_id1 = dict([(dm_col, i + 1) for i, dm_col in enumerate(dm_cols)])
# mne array: n-events x 3
mne_events = np.stack(
[events_df["epoch_id"].to_numpy(), np.zeros(len(events_df)), dm_col_code],
axis=1,
).astype("int")
# pdb.set_trace()
real_event_id = np.unique(mne_events[:, 2])
mne_event_id = {
key: value for key, value in mne_event_id1.items() if value in real_event_id
}
return mne_event_id, mne_events
[docs]class EpochsSpudtr(EpochsArray):
def __init__(
self,
input_fname,
eeg_streams,
eeg_locations_f,
categories,
time_stamp,
epoch_id=None,
time=None,
time_unit=None,
):
epochs_df = pd.read_feather(input_fname)
# check dataframe format
_epochs_QC(epochs_df, eeg_streams, epoch_id=epoch_id, time=time)
mne_event_ids, mne_events = categories2eventid(
epochs_df, categories, epoch_id, time, time_stamp
)
# no point to an event ids dict without the actual events
if mne_event_ids is not None and mne_events is None:
raise ValueError("mne_events must also be specified to use mne_event_ids")
# compute sfreq samples / second from the time-stamps. _epochs_QC should
# ensure regular sampling interval but check anyway ...
timestamps = epochs_df[time].unique()
sampling_interval = list(set((timestamps - np.roll(timestamps, 1))[1:]))
assert len(sampling_interval) == 1 # should be guaranteed by _epochs_QC
sfreq = 1.0 / (sampling_interval[0] * time_unit) # samples per second
montage = _streams2mne_digmont(eeg_streams, eeg_locations_f)
info = mne.create_info(montage.ch_names, sfreq=sfreq, ch_types="eeg")
info.set_montage(montage) # for mne >0.19
tmin = epochs_df[time].min() * time_unit
epochs_data = []
# import pdb; pdb.set_trace()
for epoch_i in epochs_df[epoch_id].unique():
epoch1 = epochs_df[montage.ch_names][
epochs_df.epoch_id == epoch_i
].to_numpy()
epochs_data.append(epoch1.T)
super().__init__(
epochs_data, info=info, tmin=tmin, events=mne_events, event_id=mne_event_ids
)
# API
[docs]def read_spudtr_epochs(
input_fname,
eeg_streams,
eeg_locations_f,
categories,
time_stamp,
epoch_id=None,
time=None,
time_unit=None,
):
"""Parameters
------------
convert spudtr format epochs data to MNE Epochs
input_fname : file name of spudtr format epochs data.
eeg_streams : list of str
column names of the data streams
eeg_locations_f : path and file of mne_32chan_xyz_spherical.yml
categories : str or iterable of str
The column name(s) of the categorical variables.
time_stamp : int The time stamp in the epoch to look up the
categorical variable values, e.g., ``0``
epoch_id : str
name of the epoch index
time : str
name of the time stamp index, e.g., "time_ms"
time_unit : float
time stamp unit in seconds, e.g., 0.001 for milliseconds, 1.0
for seconds
Returns
-------
epochs : mne.Epochs
"""
return EpochsSpudtr(
input_fname,
eeg_streams,
eeg_locations_f,
categories,
time_stamp,
epoch_id,
time,
time_unit,
)