import hashlib
import inspect
import os.path
import re
import warnings
import h5py
import json
import pprint
import yaml
import uuid
import pandas as pd
import copy
import logging
from pathlib import Path
# import dpath.util
# from . import dpath
from mkpy import dpath
import numpy as np
import matplotlib.pyplot as plt
from mkpy import mkio, pygarv, h5tools
from mkpy.codetagger import CodeTagger
from . import current_function, indent, log_exceptions
from mkpy import get_ver
__version__ = get_ver()
logging.info("Entering " + __name__)
# FIX ME: do something w/ custom exceptions
[docs]class BadChannelsError(Exception): # from NJS, deprecated
pass
[docs]class DuplicateLocationLabelError(Exception):
pass
[docs]class MightyWeenieCals(UserWarning):
pass
[docs]class EpochSpansBoundary(UserWarning):
pass
[docs]class LogRawEventCodeMismatch(UserWarning):
pass
[docs]class DigRecordsNotSequential(UserWarning):
pass
[docs]class mkh5:
"""Import and prepare ERPSS single-trial data for cross-platform analysis.
This class provides the user API for converting compressed binary
EEG data files into readily accessible HDF5 files.
Parameters
----------
h5_fname : str
Path to a new or existing HDF5 file used as the database.
"""
# BEGIN DEPRECATED dtypes
# _utc = 'u4' # stub for time stamp in microseconds
# _bin_num = 'u4' # 0-base numeric bin index from blf
# END DEPRECATED dtypes
# _chan_num = 'u4' # 0-base numeric channel index from dig header
# _mkAD = 'i2' # kutaslab AD samples are int16
# _art_flag = 'u2' # bit mask for bad channels 0 = good, 1 = bad
# numpy dtypes for the data coming from dig .crw/.raw
_evtick = "u4" # dig crw sample counter, mis-named clk_tick in log2asci
_evcode = "i2" # numeric event codes from dig marktrack *AND* log
_log_ccode = "u2" # numeric condition code from log
_log_flag = "u2" # numeric condition code from log
_mk_EEG = "f2" # kutaslab 12-bit AD or 16 bits after calibrating
_epoch_t = "i8" # positive and negative samples for interval data
_pygarv = "uint64" # 64-bit column to track up to 64 pygarv data tests
# white list of dig raw (*NOT LOG*) event codes for
# splitting the .raw/.crw into mkh5 dblocks
_dig_pause_marks = (-16384,)
# HDF5 slashpath to where epochs tables are stashed in the mkh5 file
EPOCH_TABLES_PATH = "_epoch_tables"
[docs] class Mkh5Error(Exception):
"""general purposes mkh5 error"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs] class Mkh5CalError(Exception):
"""mkh5 calibration error"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs] class EpochsTableDataError(Exception):
"""raised for pd.Series data we can't or won't directly convert for HDF5
These include mixed num-like and str-like * booleans with missing data
"""
def __init__(self, pd_data_type, series):
if series.name is None:
series.name = "un_named_series"
if series.hasnans:
self.msg = (
"\npandas.Series " + series.name + " {0} data type"
" with missing data/NaN not supported for "
" epochs tables"
).format(pd_data_type)
else:
self.msg = (
"\npandas.Series "
+ series.name
+ " {0} data type not supported for"
" epochs tables"
).format(pd_data_type)
print(self.msg)
# log data types ... ticks are uint64, everything else can be int16
# _log_dtype = np.dtype([
# ("log_evticks", _evtick),
# ("log_evcodes", _evcode),
# ("log_ccodes", _log_ccode),
# ("log_flags", _log_flag),
# ])
# structure to merge dig marktrack and log info
# FIX ME FOR EPOCHS
_event_dtype = np.dtype(
[
("evticks", _evtick),
("raw_evcodes", _evcode),
("log_evcodes", _evcode),
("log_ccodes", _log_ccode),
("log_flags", _log_flag),
]
)
_dblock_slicer_dtype = np.dtype(
[
("start_samps", _epoch_t),
("anchor_samps", _epoch_t),
("stop_samps", _epoch_t),
]
)
# define a datatype for the bin-event table ... essentially a decorated log
_bin_event_dtype = np.dtype(
[
("evticks", _evtick),
("raw_evcodes", "i2"),
("log_evcodes", "i2"),
("log_ccodes", "u2"),
("log_flags", "u2"),
("bin_nums", "u2"),
("bin_descs", "S64"), # icky ... hardcoded string lengths
]
)
# FIX ME ... non-event type epoch?
_epoch_dtype = np.dtype(
[
("anchor_ticks", _evtick),
("raw_evcodes", _evcode),
("epoch_starts", _evtick),
("epoch_stops", _evtick),
]
)
# for dumping instance info ...
# pp = pprint.PrettyPrinter(indent=4)
def __init__(self, h5name):
"""initialize and set mkh5 file name for this instance. If the file
doesn't exist, create it. If it exists, test read/writeability
Parameters:
h5name (string) file path to the mkh5 format hdf5 file.
"""
if isinstance(h5name, Path):
h5name = str(h5name)
# each mkh5 instance is tied to one and only one hdf5 format file
logging.info(indent(2, "h5name is " + h5name))
self.h5_fname = h5name
# if file doesn't exist open, an empty hdf5 file
if not os.path.isfile(self.h5_fname):
self.reset_all()
else:
# file exists, warn if not readable or read-writable
with h5py.File(self.h5_fname, "r") as h5:
source_ver = __version__.split(r".")[0] # major version
if "version" in h5.attrs.keys():
file_ver = h5.attrs["version"].split(r".")[0]
else:
file_ver = None
if file_ver is not None and source_ver != file_ver:
msg = "version mismatch: source=={0} file {1}=={2}".format(
source_ver, self.h5_fname, file_ver
)
warnings.warn(msg)
# try to open for read-write and let h5py deal with problems
try:
with h5py.File(self.h5_fname, "r+") as h5:
pass
except:
msg = "{0} is read only".format(self.h5_fname)
warnings.warn(msg)
# h5 introspection
def _headers_equal(self, header1, header2):
"""returns boolean if all header dict keys and values are =="""
raise NotImplemented("FIX ME WITH dpath.utils NOW")
k1 = sorted(header1.keys())
k2 = sorted(header2.keys())
# if k1==k2 can iterate on either set of keys
if k1 != k2 or any([header1[k] != header2[k] for k in k1]):
return False
else:
return True
# ------------------------------------------------------------
# Public artifact tagging
# ------------------------------------------------------------
[docs] def garv_data_group(self, h5_data_group_path, skip_ccodes=[0]):
"""Run `pygarv` on all the dblocks under the `h5_data_group_path`.
Parameters
----------
h5_data_group_path : str
name of the h5 datagroup containing the dblocks to screen
skip_ccodes : list of uint, None ([0])
dblocks with log_ccodes in the list are not scanned. Default
is [0] to skip calibration blocks. Setting to None disables
the skipper and scans all dblocks in the data group
"""
dblock_paths = h5tools.get_dblock_paths(self.h5_fname, h5_data_group_path)
for dbp in dblock_paths:
hdr, dblock = self.get_dblock(dbp)
# ccodes can only change on dig start or pause so
# homogenous on unless there is some goofy log
# poking going on ...
log_event_idxs = np.where(dblock["log_evcodes"] != 0)[0]
assert (
dblock["log_ccodes"][log_event_idxs].max()
== dblock["log_ccodes"][log_event_idxs].min()
)
log_ccode = dblock["log_ccodes"][log_event_idxs[0]]
if skip_ccodes is not None:
if log_ccode in skip_ccodes:
msg = "pygarv skipping {0} with " "log_ccode {1}".format(
dbp, log_ccode
)
print(msg)
continue
print("pygarving {0} log_ccode {1}".format(dbp, log_ccode))
with h5py.File(self.h5_fname, "r+") as h5:
h5[dbp]["pygarv"] = pygarv._garv_dblock(hdr, dblock)
# ------------------------------------------------------------
# Public event code tag mapping and epoching utilities
# ------------------------------------------------------------
[docs] def get_event_table(self, code_map_f, header_map_f=None):
"""Reads the code tag and header extractor and returns an event lookup table
Parameters
----------
code_map_f : str
Excel, YAML, or tab-separated text, see mkh5 docs for
format details.
header_map_f : str
YAML header extractor file, keys match header keys, values specify
name of the event table column to put the header data
Returns
-------
event_table : pandas.DataFrame
See Note.
Note
----
1. This sweeps the code tag map across the data to generate a lookup
table for specific event (sequence patterns) where the rows specify:
* slashpath to the mkh5 dblock data set and sample index
for pattern-matching events.
* all the additional information for that pattern given in
the code tag map
The event table generated from mkh5 data and the code_map
specification is in lieu of .blf (for EEG epoching and
time-locking), .rts (for event-to-event timing), and .hdr
(for experimental design specification).
2. ``ccode`` Special column. If the code tag map has a column
named ``ccode`` the code finder finds events that match the
code sequence given by the regex pattern **AND** the
log_ccode == ccode. This emulates Kutas Lab `cdbl` event
lookup and to support, e.g., the condition code == 0 for
cals convention and blocked designs where the `ccode`
varies block to block. If the code map does not specify
a ``ccode``, column the `log_ccode` column is ignored for
pattern matching.
"""
# instantiate the codemapper w/ its map and code finder
ctagger = CodeTagger(code_map_f)
if "ccode" in ctagger.code_map.columns:
msg = (
f"\nAs of mkpy 0.2.0 to match events with a codemap regexp pattern, the\n"
f"ccode column in {Path(code_map_f).name} must also match the log_ccode\n"
f"in the datablock. If this behavior is not desired, delete or rename\n"
f"the ccode column in the codemap."
)
warnings.warn(msg)
# set up to extract info from the header
hio = self.HeaderIO()
if header_map_f is not None:
hio.set_slicer(header_map_f)
# fetch all data that have at least one mkh5 datablock (dblock_0)
match_list = []
dgroup_paths = h5tools.get_data_group_paths(self.h5_fname)
with h5py.File(self.h5_fname, "r") as h5:
for dgp in dgroup_paths:
dblock_paths = h5tools.get_dblock_paths(self.h5_fname, dgp)
for dbp in dblock_paths:
assert dgp in dbp # group and data block must agree
hio.get(h5[dbp]) # need this for srate at least
# slice the header if there is an extractor
if hio._slicer is not None:
hdr_data = hio.get_slices()
else:
hdr_data = []
print("searching codes in: " + dbp)
event_idxs = (
h5[dbp]["log_evcodes"] != 0
) # samples w/ non-zero events
dblock_ticks = h5[dbp]["dblock_ticks"][event_idxs]
crw_ticks = h5[dbp]["crw_ticks"][event_idxs]
raw_evcodes = h5[dbp]["raw_evcodes"][event_idxs]
log_evcodes = h5[dbp]["log_evcodes"][event_idxs]
log_ccodes = h5[dbp]["log_ccodes"][event_idxs]
log_flags = h5[dbp]["log_flags"][event_idxs]
# iterate on keys which are the code patterns
for idx, cm in ctagger.code_map.iterrows():
# matches is a list of lists of dict, one dict for each group
code_pattern_matches = ctagger._find_evcodes(
cm["regexp"], dblock_ticks, log_evcodes
)
if code_pattern_matches is not None:
for m in code_pattern_matches:
for mm in m:
match_tick, anchor_tick, is_anchor = (
None,
None,
None,
)
for k, v in mm:
if k == "match_tick":
match_tick = v
if k == "anchor_tick":
anchor_tick = v
if k == "is_anchor":
is_anchor = v
assert all(
[
v is not None
for v in [
match_tick,
anchor_tick,
is_anchor,
]
]
)
if is_anchor:
assert anchor_tick == match_tick
else:
assert anchor_tick != match_tick
# ok, this is the tick of the pattern match
# and it must be unique
tick_idx = np.where(dblock_ticks == match_tick)[0]
assert len(tick_idx) == 1
sample_data = [
# ("Index", idx),
("data_group", dgp),
("dblock_path", dbp),
("dblock_tick_idx", tick_idx[0]),
("dblock_ticks", dblock_ticks[tick_idx][0]),
("crw_ticks", crw_ticks[tick_idx][0]),
("raw_evcodes", raw_evcodes[tick_idx][0]),
("log_evcodes", log_evcodes[tick_idx][0]),
("log_ccodes", log_ccodes[tick_idx][0]),
("log_flags", log_flags[tick_idx][0]),
(
"epoch_match_tick_delta",
0,
), # an event is a one sample epoch
("epoch_ticks", 1),
(
"dblock_srate",
hio.header["samplerate"],
), # for conversion to times
]
# extend sample data w/ the header information
# which may be None
sample_data = sample_data + hdr_data
# extend sample_data w/ the match info and code map row
sample_data = (
sample_data + mm + list(zip(cm.index, cm))
)
match_list.append((sample_data)) # list of tuples
# pprint.pprint(match_list)
# handle no matches ...
if len(match_list) > 0:
event_table = pd.DataFrame([dict(m) for m in match_list])
# codemap ccode triggers backwards compatibility
# with Kutas Lab ERPSS cdbl
if "ccode" in ctagger.code_map.columns:
event_table = event_table.query("ccode == log_ccodes")
# event_table.set_index("Index", inplace=True)
self._h5_check_events(self.h5_fname, event_table)
return event_table
else:
raise RuntimeError("uh oh ... no events found for {0}".format(code_map_f))
def _h5_check_events(self, h5_f, e_table):
"""check the match event in event or epoch table agrees with the
dblock data
Parameters
----------
h5_f : str
path to mkh5 format hdf5 file
e_table: (pandas.DataFrame, np.ndarray)
as returned by mkh5.get_event_table()
Returns
-------
None for success
Raises
------
RuntimeError on event_table[e] vs. dblock[e] mismatch or missing columns
The minimum mandatory column names for an event table are
* data_group: full slashpath to the h5py.Group covering a
sequence of dblocks, e.g.,
S001
Expt1/Session1/S047
* dblock_path: full slashpath from the hdf5 root to one of the
daughter dblock_N h5py.Datasets (without leading /), e.g.,
S001/dblock_0
Expt1/Session1/S047/dblock_12
* dblock_ticks: the A/D sample counter which is also the row
index of the dblock where the *matched* event appearing in
the event table occurred.
* match_code: the event code of the regexp pattern-matched
group for this event table row. There is one match code for
each capture group in the regular expression pattern, so
the match code need not be the anchor code
* All anchors are matches. Some matches may not be anchors
* log_evcodes: the sequence of integer event codes occuring at
each dblock tick in the original crw/log file
"""
if isinstance(e_table, np.ndarray):
e_table = pd.DataFrame(e_table)
min_cols = subset = [
"data_group",
"dblock_path",
"dblock_ticks",
"log_evcodes",
"match_code",
]
for c in min_cols:
if not c in e_table.columns:
msg = 'mkh5 event table column "{0}"'.format(c)
msg += " is missing, all these are mandatory:" + " ".join(min_cols)
raise RuntimeError(msg)
with h5py.File(h5_f, "r") as h5:
for index, e in e_table.iterrows():
# These should only fail if the datablocks or event table have
# been monkeyed with. Anyone who can do that can chase down
# the assertion exception.
data = h5[e["dblock_path"]][e["dblock_ticks"]]
assert e["data_group"] in e["dblock_path"]
# the log event code must be an anchor or a match
if e["match_code"] != e["anchor_code"]:
assert e["match_code"] == data["log_evcodes"]
else:
assert e["anchor_code"] == data["log_evcodes"]
check_cols = [col for col in e.index if col in data.dtype.names]
for col in check_cols:
assert data[col] == e[col]
def _check_epochs_table(self, epochs_table):
"""check a set epochs table for event codes, epoch length, and offset
Parameters
----------
epochs_table : pd.DataFrame
event table format with extra columns:
epoch_ticks = fixed epoch duration in units of samples
epoch_match_tick_delta = number of samples from epoch start to matched event code
Returns
-------
None
Raises
------
ValueError
if epoch event codes and data blocks column values do not match the datablocks or
epoch length and start offset values are not uniform across the epochs.
"""
epoch_required_columns = ["epoch_ticks", "epoch_match_tick_delta"]
# epochs tables are an extension of event tables, check the events first
self._h5_check_events(self.h5_fname, epochs_table)
# then epoch length and matched code offset (in samples)
for c in epoch_required_columns:
if c not in epochs_table.dtype.names:
msg = "epochs table missing required column {0}".format(c)
raise ValueError(msg)
vals = np.unique(epochs_table[c])
if len(vals) > 1:
msg = "epochs table column {0} values must be the same: {1}".format(
c, vals
)
raise ValueError(msg)
def _h5_check_epochs_table_name(self, h5_f, epochs_table_name):
"""look up and check a previously set epochs table
Parameters
----------
h5_f : str
name of an mkh5 format HDF5 file, e.g., self.h5_fname or other
epochs_table_name : str
name of an epochs table, must exist in h5_f
Returns
-------
None
Raises
------
ValueError
if something is wrong with the lookup or table itself
"""
eptbl = self.get_epochs_table(epochs_table_name, format="numpy")
self._check_epochs_table(eptbl)
[docs] def set_epochs(self, epochs_table_name, event_table, tmin_ms, tmax_ms):
"""construct and store a named EEG epochs lookup-table in self['epcochs']
For storing in hdf5 the columns must be one of these:
string-like (unicode, bytes)
int-like (int, np.int, np.uint32, np.uint64)
float-like (float, np.float32, np.float64)
Parameters
----------
epochs_table_name : string
name of the epochs table to store
event_table : pandas.DataFrame
as returned by mkh5.get_event_table()
tmin_ms : float
epoch start in millseconds relative to the event, e.g, -500
tmax_ms : float
epoch end in millseconds relative to the event, e..g,
1500, strictly greater than tmin_ms
Returns
-------
None
updates h5_f/EPOCH_TABLES_PATH/ with the named epoch table h5py.Dataset
Notes
-----
The epochs table is a lightweight lookup table specific to
each mkh5 instance's hdf5 file,
h5['epochs'][epochs_table_name] = epochs_table
The epochs table is row-for-row the same as the time-locking event table
and just adds the time interval information for use when extracting the
time series EEG data segments.
For reproducibility, by design the epochs tables in an mkh5
file are write-protected. New tables may be added, but
existing tables cannot be overwritten or deleted. If you need
to the revise the epochs, rebuild the mkh5 file from crws/logs
with the ones you want.
"""
with h5py.File(self.h5_fname, mode="r") as h5:
if (
mkh5.EPOCH_TABLES_PATH in h5.keys()
and epochs_table_name in h5[mkh5.EPOCH_TABLES_PATH].keys()
):
msg = (
f"epochs name {epochs_table_name} is in use, "
f"pick another name or use reset_all() to "
f"completely wipe the mkh5 file: {self.h5_fname}"
)
raise RuntimeError(msg)
# event_table = self.get_event_table(code_map_f)
if event_table is None:
raise ValueError("uh oh, event_table is empty")
self._h5_check_events(self.h5_fname, event_table)
# ------------------------------------------------------------
# 1. sanitize the pandas.Dataframe columns
# ------------------------------------------------------------
print("Sanitizing event table data types for mkh5 epochs table ...")
# remap pandas 'O' dtype columns to hdf5 friendly np.arrays if
# possible. Run column-wise so exception messages are
# informative. Run on a copy so the nan handler can mod the
# series in place, else pd warning "setting value on copy"
tidy_table = pd.DataFrame()
for c in event_table.columns:
tidy_table[c] = self._pd_series_to_hdf5(event_table[c].copy())
event_table = tidy_table
# 2. define a numpy compound data type to hold the event_table
# info and region refs
# start with epoch_id index
epoch_dt_names = ["epoch_id"]
# epoch_dt_types = ["uint64"] # < 0.2.4 pandas.Index unfriendly
epoch_dt_types = ["int64"] # >= 0.2.4 pandas.Index friendly
# continue new dtype for event info columns, mapped to hdf5 compatible np.dtype
# event_table_types = [event_table[c].dtype for c in event_table.columns]
for i, c in enumerate(event_table.columns):
epoch_dt_names.append(c)
epoch_dt_types.append(event_table[c].dtype.__str__())
# events have sample ticks, epochs add timestamps for human readability
epoch_dt_names += ["match_time", "anchor_time", "anchor_time_delta"]
epoch_dt_types += ["int64"] * 3
# TPU discrete time interval (DITI) tags ahead for future
# diti_t_0 = sample = dlbock_tick index where interval time == 0
# diti_hop = +/- interval start relative to diti_t_0 in samples/ticks
# diti_len = interval length in samples
epoch_dt_names += ["diti_t_0", "diti_hop", "diti_len"]
# diti_t_0, diti_len should be uint but pandas.Index squawks
epoch_dt_types += ["int64"] * 3
# construct the new dtype and initialize epochs table array
epoch_dt = np.dtype(list(zip(epoch_dt_names, epoch_dt_types)))
epochs = np.zeros(shape=(len(event_table),), dtype=epoch_dt)
# set the epoch_id counting index and copy the tidied event table
epochs["epoch_id"] = [idx for idx in range(len(event_table))]
for c in event_table.columns:
epochs[c] = event_table[c]
# 3. time lock each epoch to the match tick, and set the
# interval from the function arguments
hio = self.HeaderIO()
# fetch sampling rate
srates = np.unique(epochs["dblock_srate"])
assert len(srates) == 1, "epochs['dblock_srate'] varies"
srate = srates[0]
# scalar epoch start offset and duration in samples
epoch_match_tick_delta = mkh5._ms2samp(tmin_ms, srate)
duration_samps = mkh5._ms2samp(tmax_ms - tmin_ms, srate)
# (variable) match_tick in place from the event table, set
# set the (constant) offset and duration columns
epochs["epoch_match_tick_delta"] = epoch_match_tick_delta
epochs["epoch_ticks"] = duration_samps
# update the timestamps
epochs["match_time"] = int(0)
epochs["anchor_time_delta"] = np.array(
[
int(mkh5._samp2ms(atd, srate))
for atd in epochs[
"anchor_tick_delta"
] # epochs["match_tick"] - e["anchor_tick"]
]
)
# anchor_time == anchor_time_delta here at match_time = 0, tho
# varies by time in epochs data
epochs["anchor_time"] = epochs["anchor_time_delta"]
# new 0.2.4, discrete time interval (DITI) time lock, shift, length
# columns. Redundant for now, included for future DITI tagging
epochs["diti_t_0"] = epochs["match_tick"]
epochs["diti_hop"] = epoch_match_tick_delta
epochs["diti_len"] = duration_samps
# ------------------------------------------------------------
# error check
# whitelist in bound epochs FIX ME ... no need to iterate
is_in_bounds = np.zeros(len(epochs)).astype(bool)
start_samps = epochs["match_tick"] + epoch_match_tick_delta
with h5py.File(self.h5_fname, "r+") as h5:
# for i,e in event_table.iterrows():
for i, e in enumerate(epochs):
# check event table sampling rate agrees w/ dblock
dbp = e["dblock_path"]
hio.get(h5[dbp])
if srate != hio.header["samplerate"]:
msg = (
"{0}['samplerate']: {1} does not match "
"event table[{2}]['dblock_samplerate': "
"{3}"
).format(dbp, hio.header["samplerate"], i, srate)
raise ValueError(msg)
# bounds check with messages per epoch
if duration_samps <= 0:
msg = (
"epoch interval {0} {1} is less than one sample at "
"{3} ... increase the interval"
).format(tmin_ms, tmax_ms, srate)
raise ValueError(msg)
# move on after bounds check
if start_samps[i] < 0:
warnings.warn(
"data error: pre-stimulus interval is out of bounds left ... "
+ "skipping epoch {0}".format(e)
)
continue
elif start_samps[i] + duration_samps > len(h5[dbp]):
warnings.warn(
"data error: post-stimulus interval is out of bounds right ... "
+ "skipping epoch {0}".format(e)
)
continue
else:
# if in bounds update epoch match offset
is_in_bounds[i] = True
# drop out of bounds epochs and check epochs are consistent
epochs = epochs[is_in_bounds]
# check the epochs for consistency
self._check_epochs_table(epochs)
# 4. add epoch table in the mkh5 file under /EPOCH_TABLES_PATH/epochs_table_name
with h5py.File(self.h5_fname, "r+") as h5:
epochs_path = f"{mkh5.EPOCH_TABLES_PATH}/{epochs_table_name}"
ep = h5.create_dataset(epochs_path, data=epochs)
attrs = {"tmin_ms": tmin_ms, "tmax_ms": tmax_ms}
for k, v in attrs.items():
ep.attrs[k] = v
return None # ok
[docs] def export_event_table(self, event_table, event_table_f, format="feather"):
"""fetch the specified event table and save it in the specified format"""
known_formats = ["feather", "txt"] # txt is tab-separated
if format not in known_formats:
msg = "event_table export format must be 'feather' or 'txt'"
raise ValueError(msg)
# event_table = self.get_event_table(code_map_f)
if event_table is None:
msg = (
"uh oh ... event_table is None for {0} ..." "nothing to export"
).format(code_map_f)
raise ValueError(msg)
if format == "feather":
event_table.reset_index(inplace=True)
event_table.to_feather(event_table_f)
elif format == "txt":
event_table.to_csv(event_table_f, sep="\t")
else:
raise RuntimeError()
[docs] def get_epochs_table_names(self):
"""returns a list, possibly empty of previously named epochs tables"""
epochs_names = []
try:
with h5py.File(self.h5_fname, "r") as h5:
epochs_names = [t for t in h5[mkh5.EPOCH_TABLES_PATH].keys()]
except Exception:
pass
return epochs_names
def _pd_series_to_hdf5(self, series):
"""normalize pandas.Series +/- missing or nans to array for hdf5 serialization
Parameter
---------
series : pandas.Series
Returns
-------
arry_hdf5 : np.array, shape (1,), dtype=column scalar dtype
Raises
------
TypeError if series is not pandas.Series
ValueError if series is empty
EpochsTableDataError if series data doesn't convert to hdf5
Supported data types
* a single, homogenous scalar data type drawn from these
float-like: float, np.float32, np.float64, etc.
int-like: int, np.int32, np.int64, etc.
uint-like: np.uint32, np.uint64, etc.
boolean-like: bool, np.bool
string-like: str, bytes, unicode
* missing data/NaNs are supported **except for boolean-like**
NaN, None conversions as follows:
Series type | from | to hdf5
------------ | -------------- | ------------
float-like | np.NaN, None | np.nan
int-like | pd.NaN, None | np.nan, int coerced to float_
uint-like | pd.NaN, None | np.nan, int coerced to float_
string-like | pd.NaN, None | b'NaN'
boolean-like | pd.NaN, None | not allowed
Known dtypes according to pandas 0.21.0 return by infer_dtype
empty (returned when all are None, undocumented in pandas)
string, unicode, bytes, floating, integer,
mixed-integer, mixed-integer-float, decimal,
complex, categorical, boolean, datetime64,
datetime, date, timedelta64, timedelta, time,
period, mixed
Approach: for mkh5 supported dtypes the pd.Series dtype 'O' has 2 cases:
- not hasnans
- values are str_like -> to bytes -> np.array
- values are mixed types -> illegal, die
- hasnans: two cases
- the non-missing values are mixed types: illegal, die
- the non-missing values are homogenous: handle by NaNs by type as above
- float_like -> missing/None are already np.nan -> np.array float
- int_like -> replace nans w/ max int of dtype -> np.array float
- uint_like -> replace nans w/ max uint of dtype -> np.array float
- str_like -> replace nans w/ 'NaN' -> to bytes -> np.array bytes
- bool_like -> NaN/missing illegal, die
"""
if not isinstance(series, pd.Series):
msg = "wrong type {0}: must be pandas.Series".format(type(series))
raise TypeError(msg)
if not len(series) > 0:
msg = "empty series"
raise ValueError(msg)
pd_num_like = ["floating", "integer", "decimal", "complex"]
pd_bytes_like = [
"string",
"unicode",
"bytes",
# 'categorical', # ??
]
pd_bool_like = ["boolean"]
pd_type = pd.api.types.infer_dtype(
series, skipna=False
) # mixed if missing values present
# pd_data_type = pd.api.types.infer_dtype(series.dropna()) # mixed if mixed data
pd_data_type = pd.api.types.infer_dtype(
series, skipna=True
) # mixed if mixed data
series_types = pd.unique([type(i) for i in series.values])
data_types = pd.unique([type(i) for i in series.dropna().values])
# homogonenous data w/ no missing values
if series.dtype != "O": #
try:
arry = np.array(series)
except Exception as fail:
print("column ", series.name)
raise fail
assert arry.dtype != "O"
return arry
else:
# white-list the allowed conversions, all else fails
# any combination of str-like values +/- missing data -> bytes +/- 'NaN
if all([pd.api.types.is_string_dtype(dt) for dt in data_types]):
if series.hasnans:
series.fillna(".NAN", inplace=True)
# try each value to diagnosis if problem
for v in series.values:
try:
np.array([v]).astype(np.string_)
except Exception as fail:
msg = ("\nvalue: {0}\n" "column: {1}").format(v, series.name)
print(msg)
raise fail
# now try whole series
try:
arry = np.array(series.values.astype(np.string_))
except Exception as fail:
print("column ", series.name)
raise fail
assert arry.dtype != "O"
return arry
# handle num-like +/- missing data
elif pd_data_type in pd_num_like:
try:
arry = np.array(series)
except Exception as fail:
print("column ", series.name)
raise fail
assert arry.dtype != "O"
return arry
else:
# fail this blocks mixed numerics, boolean+NaN
raise mkh5.EpochsTableDataError(pd_data_type, series)
[docs] def get_epochs_table(self, epochs_name, format="pandas"):
"""look up a previously set epochs table by name
Parameters
----------
epochs_name : str
name of a previously defined epochs table as set with an
mkh5.set_epochs(event_table)
format : str {'pandas', 'numpy'}
pandas.Dataframe or numpy.ndarray
Returns
-------
epochs_table : pandas.Dataframe or numpy.ndarray
Bytestrings from the hdf5 are converted to unicode epochs_table
table returned
"""
if format not in ["pandas", "numpy"]:
msg = "format must be 'pandas' or 'numpy'"
raise ValueError(msg)
epochs_table = None
with h5py.File(self.h5_fname, "r") as h5:
epochs_path = f"{mkh5.EPOCH_TABLES_PATH}/{epochs_name}"
epochs_table = h5[epochs_path][...]
if epochs_table is None:
msg = "epochs table not found: {0}".format(epochs_name)
raise RuntimeError(msg)
# clean up hdf5 bytestrings
# FIX ME for NANs?
dts = []
for n in epochs_table.dtype.names:
if epochs_table.dtype[n].kind == "S":
dts.append((n, "U" + str(epochs_table.dtype[n].itemsize)))
else:
dts.append((n, epochs_table.dtype[n]))
dts = np.dtype(dts)
eptbl = np.empty(shape=epochs_table.shape, dtype=dts)
for idx in range(epochs_table.shape[0]):
for c in dts.names:
value = copy.deepcopy(epochs_table[c][idx])
if hasattr(value, "decode"):
eptbl[c][idx] = value.decode("utf8")
else:
eptbl[c][idx] = value
# run consistency check
self._check_epochs_table(eptbl)
if format == "pandas":
eptbl = pd.DataFrame(eptbl)
# eptbl.set_index("Index", inplace=True)
return eptbl
def _h5_get_epochs(self, epochs_name, columns=None):
"""merge datablock segments (event codes, EEG) with code tags and timestamps.
Each row (1, n) in the epochs table is broadcast to an (m, n)
array for the samples in the specified epochs interval and
timestamps calculated for the anchor and match events.
Epoch interval are extracted relative to the *matched* event
in the code tag, see Yields for details.
Parameters
----------
epochs_name : string
name of epochs table Dataset in h5['epochs']
columns : list of strings, default = None extracts all
column names to extract
Yields
------
epoch : numpy structured array shape = (m, n + 2) where
* m == `epoch_table['epoch_ticks']`, the length of the epoch in samples
* n == the number of columns in `epoch_table`
* the starting sample of the epoch is calculated relative to the
*matched* event via `epoch_table['epoch_match_tick_delta']`
* `epoch[:, column]` == `epoch_table[column]` for `column`
in `epoch_table.dtype.names`. This broadcasts the
experimental event code tags to all samples in the epoch.
epoch['match_time']: uint64
epoch['match_time'] == 0 for the *matched* event
epoch['anchor_time']: uint64
epoch['anchor_time'] == 0 for the *anchor* event
Note
----
Iterating over this generator will fetch all the epochs given
in epochs_name
.. TO DO: implement hdf5 region refs
"""
with h5py.File(self.h5_fname, "r") as h5:
epochs_path = f"{mkh5.EPOCH_TABLES_PATH}/{epochs_name}"
epoch_view = h5[epochs_path]
epoch_cols = epoch_view.dtype.names
# guard against irregular epochs
assert len(np.unique(epoch_view["epoch_ticks"])) == 1
assert len(np.unique(epoch_view["epoch_match_tick_delta"])) == 1
for e in epoch_view:
nsamp = e["epoch_ticks"]
# fill nsamp rows x event info columns
event_info = np.stack([e for n in range(nsamp)], axis=0)
# set for epoch slice
start, stop = None, None
start_samp = e["match_tick"] + e["epoch_match_tick_delta"]
stop_samp = start_samp + nsamp
epoch_streams = h5[e["dblock_path"]][start_samp:stop_samp]
# upconvert EEG columns float16 to float32 b.c. 2 byte floats
# fight w/ feather (unsupported datatype), MATLAB
# (cannot co-mingle w/ int64)
f4_streams_dtype = []
for name in epoch_streams.dtype.names:
if epoch_streams.dtype[name] == "float16":
f4_streams_dtype.append((name, "float32"))
else:
f4_streams_dtype.append((name, epoch_streams.dtype[name]))
f4_streams_dtype = np.dtype(f4_streams_dtype)
epoch_streams = np.array(epoch_streams, dtype=f4_streams_dtype)
# merge epoch table, match_time, and datablock stream table column names
all_cols = list(event_info.dtype.names)
for c in epoch_streams.dtype.names:
if c not in all_cols:
all_cols.append(c)
# use all available columns (default) or specified subset
if columns is None:
epoch_dt_names = all_cols
else:
for c in columns:
if not c in all_cols:
msg = "column {0} not found in epoch table or data block: ".format(
c
)
msg += " ".join(all_cols)
raise RuntimeError(msg)
epoch_dt_names = columns
# ------------------------------------------------------------
# ATTENTION: THIS SECTION IS HIGHLY PROCEDURAL AND BRITTLE
# ------------------------------------------------------------
# build data types
epoch_dt_types = []
for n in epoch_dt_names:
if n in epoch_streams.dtype.names:
epoch_dt_types.append(epoch_streams.dtype[n])
elif n in e.dtype.names:
epoch_dt_types.append(event_info.dtype[n])
else:
raise ValueError(
f"column {n} not found in dblock or epoch table"
)
# # define dtype and initialize
epoch_dt = np.dtype(list(zip(epoch_dt_names, epoch_dt_types)))
# epoch = np.ndarray(shape=(nsamp,), dtype=np.dtype(epoch_dt))
epoch = np.zeros(shape=(nsamp,), dtype=np.dtype(epoch_dt))
# take the stream names first to protect the time
# varying columns, then propagate the new info from
# the epoch event.
srate = e["dblock_srate"]
assert e["match_time"] == 0
for n in epoch_dt_names:
# these are already time-varying
if n in epoch_streams.dtype.names:
epoch[n] = epoch_streams[n]
# generate match, anchor time stamps and deltas
elif n == "match_time":
epoch[n] = [
int(mkh5._samp2ms(x - e["match_tick"], srate))
for x in range(start_samp, stop_samp)
]
elif n == "anchor_time":
epoch[n] = [
int(mkh5._samp2ms(x - e["anchor_tick"], srate))
for x in range(start_samp, stop_samp)
]
# broadcast the constants across the times
elif n == "anchor_time_delta":
epoch[n] = int(mkh5._samp2ms(e["anchor_tick_delta"], srate))
# broadcast event info
elif n in event_info.dtype.names:
epoch[n] = event_info[n]
else:
raise ValueError(
"uh oh ... unknown column {0} in epoch data extraction".format(
n
)
)
# ------------------------------------------------------------
yield (epoch)
[docs] def get_epochs(self, epochs_name, format="numpy", columns=None):
"""fetch single trial epochs in tabluar form
Parameters
----------
epochs_name : str
name of previously set epochs table
format : str {'numpy', 'pandas'}
columns : list of str or None {'None'}
the subset of column names to extract
Returns
-------
epochs : numpy.ndarray or pandas.DataFrame
epochs.shape == (i x m, n + 2) where
i = the number of epochs, indexed uniquely by epoch_table['epoch_id']
m = epoch length in samples
n = the number of columns in the `epochs_name` epochs table
See `_h5_get_epochs()` for details.
attrs : dict
stub
"""
if format not in ["numpy", "pandas"]:
msg = f"format='numpy' or format='pandas' not {format}"
raise ValueError(msg)
epochs = np.array(
[e for e in self._h5_get_epochs(epochs_name, columns=columns)]
).flatten()
if format == "numpy":
pass
elif format == "pandas":
epochs = pd.DataFrame(epochs)
# cleanup bytestrings
for col in epochs.columns:
try:
# encode as utf8 or shrug and move on
epochs.loc[:, col] = epochs.loc[:, col].str.decode("utf8")
except Exception as fail:
pass
else:
raise Exception("uncaught exception")
# fetch the attrs for this epoch dataset
with h5py.File(self.h5_fname, "r") as h5:
attrs = dict()
for k, v in h5[mkh5.EPOCH_TABLES_PATH][epochs_name].attrs.items():
attrs[k] = v
return epochs, attrs
[docs] def export_epochs(self, epochs_name, epochs_f, file_format="h5", columns=None):
"""write previously set epochs to data in the specified file format
Recommended epoch export formats for cross-platform data interchange
Parameters
----------
epochs_name : string
must name one of the datasets in this h5['epochs']
epochs_f : string
file path and name of the data file
file_format : string, {'h5', 'pdh5', 'feather', 'txt'}
.. warning ::
File formats other than h5 overwrite any file with the same
name without warning.
Note
----
* h5 format:
* the epochs are saved in the HDF5 file root as a dataset
named `epochs_name`. Fails if such a dataset already
exists.
* 2-D rows x columns epochs data are stored as a single 1-D
column vector (rows) of an HDF5 compound data type
(columns). This HDF5 dataset is easily read and unpacked
with any HDF5 reader that supports HDF5 compound data
types.
* pdh5 format: 2-D rows x columns epochs data are written to
disk with `pandas.to_hdf` writer (via pytables). These
epochs data are easily read into a `pandas.DataFrame` with
`pandas.read_hdf(epochs_f, key=epochs_name)` and are also
readable, less easily, by other HDF5 readers.
* feather, txt formats: 2-D rows x columns epochs data are
written to disk with `pandas.to_feather` (via pyarrow) and
as tab-separated text with `pandas.to_csv(..., sep='\t')`.
"""
# in case of Path
epochs_name = str(epochs_name)
epochs_f = str(epochs_f)
known_formats = ["h5", "pdh5", "feather", "txt"]
if file_format not in known_formats:
msg = f"unknown file_format='{file_format}': must be one of {' '.join(known_formats)}"
raise ValueError(msg)
if file_format == "h5":
(epochs, attrs) = self.get_epochs(
epochs_name, format="numpy", columns=columns
)
with h5py.File(epochs_f, "w") as h5:
epochs_dataset = h5.create_dataset(epochs_name, data=epochs)
for k, v in attrs.items():
epochs_dataset.attrs[k] = v
else:
# non-hdf5 formats
(epochs, attrs) = self.get_epochs(
epochs_name, format="pandas", columns=columns
)
# dump with pandas
if file_format == "pdh5":
epochs.to_hdf(epochs_f, key=epochs_name, format="fixed", mode="w")
elif file_format == "feather":
epochs.to_feather(epochs_f)
elif file_format == "txt":
# don't write row count index
epochs.to_csv(epochs_f, sep="\t", index=False)
else:
msg("unknown file epoch export file format: {0}".format(file_format))
raise TypeError(msg)
return None
# ------------------------------------------------------------
# PRIVATE (-ish) CRUD. These all wrap the hp5py.File()
# in a context manager so user's don't have to.
# ------------------------------------------------------------
# data settin ops
# def _h5_update_eeg_data(self, h5f, group_name, attr, data, yhdr, *args, **kwargs):
def _h5_update_eeg_data(self, h5f, group_name, header, data, *args, **kwargs):
"""database-like CRUD to push .crw/.log data into the mkh5 format
Parameters
-----------
h5f : str
filepath to existing writable h5, else h5py Error
group_name : str
h5 file Group path name
header : mkh5.HeaderIO.header
built from .crw header + YAML info
data : numpy record array
rows = samples
columns = ticks, events, log, individual channel data
yhdr : dict
contains supplemental information from yaml file (see
`_load_yhdr`) args (stub) kwargs = passed to
*args, **kwargs
passed to h5py, e.g., compression, chunks
Warning: When appending data to existing data headers are not
checked for consistency`
Use case 1: Create/New. This is the basic step to convert
.crw/.log to the mkh5 data interchange format.
Use case 2: Update/Append. This behavior combines .crw/.log
files to add cals, recover from restarts, combine sessions and
such.
Implementation:
* mkio returns log info and the eeg strip chart incrementally
indexed by A/D sample, ie., crw_ticks = 0 ... number of
samples. This EEG data array is split on negative crw event
code rows (pauses, data errors) into mkh5 datablocks of
continuously sampled eeg.
* The mkh5 datablock is the minimal unit, consisting of
* dblock_N (h5py.Dataset) a single numpy.ndarray log + eeg
stripchart
* dblock_N.attr[json_header] (h5py.Attribute) the log + eeg
header information encoded as a JSON string.
* The processing to Create and Append .crw/.log as mkh5 datablocks is
identical
- count the number of existing datablocks in h5_path = N
- create sequentially numbered datablocks at h5_path starting with N
For Create/New, N == 0
For Append N > 0
group/dblock_n, group/dblock_n+1, ... group/dblock_n+m+m
mkh5 datablock format: dblock_N
Each continuous record is a new, sequentially
numbered recording h5py.Group under input group_name.
Under each such recording, each column in data is a new dataset.
group_name/dblock_0/crw_ticks
group_name/dblock_0/raw_evcodes
...
group_name/dblock_0/lle
group_name/dblock_0/lhz
...
group_name/dblock_0/MiPa
...
group_name/dblock_0/the_last_channel
"""
# ------------------------------------------------------------
# 1. collect the boundaries of the continuous stretchs of .crw
# data at
#
# - beginning
# - negative event codes = pauses, data errors
# - end
# ------------------------------------------------------------
negative_raw_code_idxs = (data["raw_evcodes"] < 0).nonzero()[0]
# white list pause mark codes for the dblock boundaries
pause_idxs = [
i
for i in negative_raw_code_idxs
if data["raw_evcodes"][i] in mkh5._dig_pause_marks
]
# negative log codes are common, negative raw codes other than
# pause marks are not
if len(negative_raw_code_idxs) > len(pause_idxs):
msg = "\n".join(
[
"raw tick {0}: code {1}".format(i, data["raw_evcodes"][i])
for i in negative_raw_code_idxs
if data["raw_evcodes"][i] not in mkh5._dig_pause_marks
]
)
msg += (
"\n{0} {1} unusual negative codes in raw marktrack ... "
"make sure you know why."
).format(h5f, group_name)
warnings.warn(msg)
data_boundaries = [
-1
] # start with a boundary one sample before the first, i.e., at -1
data_boundaries.extend(pause_idxs) # add the boundaries
# force a boundary at the last sample in case the file was closed without pausing
datalen = len(data["raw_evcodes"])
if data_boundaries[-1] != datalen - 1:
data_boundaries = np.append(data_boundaries, datalen - 1)
warnings.warn(
"no pause mark at the last data sample ... make sure this is OK"
)
# (start, stop) tuples of data block row indices
data_intervals = [
(data_boundaries[i] + 1, data_boundaries[i + 1])
for i in range(len(data_boundaries) - 1)
]
# build the h5 datasets and set their header attr
with h5py.File(h5f, "r+") as h5:
# find and count datablocks already in the group
dblock_ids = [k for k in h5[group_name].keys() if "dblock" in k]
nextblock = len(dblock_ids) # for the dblock_id counter
for i, (start, stop) in enumerate(data_intervals):
# split data into on the interval tuples
dblock_id, dblock = None, None
# ------------------------------------------------------------
# write the dig data chunk as dblock_N
# ------------------------------------------------------------
dblock_id = "dblock_{0}".format(nextblock + i)
dblock = h5[group_name].create_dataset(
dblock_id, data=data[...][start : stop + 1], **kwargs
)
# set this dblock ticks sequence
dblock["dblock_ticks"] = range((stop + 1) - start)
# set this dblock header
header.set(dblock)
# FIX ME: sanity check the data blocks samples total to data samples
return None
# ------------------------------------------------------------
# *** PUBLIC *** CRUD
# ------------------------------------------------------------
[docs] def reset_all(self):
"""completely wipe out the mkh5 file and reset to empty without mercy"""
with h5py.File(self.h5_fname, "w") as h5:
# version new h5 files
h5.attrs["version"] = __version__
# create a new data set in specified group
[docs] def create_mkdata(
self, h5_path, eeg_f, log_f, yhdr_f, *args, with_log_events="aligned", **kwargs
):
"""Convert Kutas lab ERPSS `.crw` and `.log` to the
`mkh5` hdf5 format.
This merges dig `.crw`, `.log`, and user-specified `.yml` data into a tidy
HDF5 dataset of continuous EEG recording + jsonic header.
.. note::
Log events are automatically truncated if log event codes
occur after the end of the EEG data. This is rare but can
happen when dig crashes or drops the last event code.
Parameters
----------
h5_path : str
The full slashpath location in the `.h5` file where the
new data blocks will be stored in the hdf5 file. Must be
the full slashpath from the root without the leading
slash.
eeg_f : str
file path to the `.crw` file.
log_f : str or None
file path to corresponding `.log` file, if any.
yhdr_f : str
file path to the YAML header file.
with_log_events : {"aligned", "from_eeg", "none", "as_is"}, optional
how to handle log file event codes (`log_evcodes`)
relative to the eeg event codes (`raw_evcodes`) from the
eeg recording.
`aligned` (default)
ensures eeg and log event code
timestamps are 1-1 but allows discrepant, e.g., logpoked,
event codes with a warning. Requires a log file. This
default is the mkpy.mkh5 <= 0.2.2 behavior.
`from_eeg`
propagates the eeg event codes (dig mark track) to the `log_evcodes`
column. Requires `log_f` is `None`.
`none`
sets `log_evcodes`, `log_ccode`, `log_flags` all to 0. Requires `log_f` is `None`.
`as_is`
loads whatever codes are in the log file without
checking against the eeg data. Requires a log
file. Silently allows eeg and log event code
misalignment. Exceedingly dangerous but useful for
disaster recovery.
*args : strings, optional
passed in to `h5py.create_dataset()`
*kwargs : key=values, optional
passed in to `h5py.create_dataset()`, e.g.,
`compression="gzip"`.
Notes
----
The EEG and event code data streams are snipped apart into
uninterrupted "datablocks" at pause marks. Each data block has
its own header containing information from the `.crw` file
merged with the additional information from the YAML header
file `yhdr_f`.
Uncompressed ERPSS `.raw` files are also legal but there is no
good reason to have them around. If the raw won't compress
because it is defective it won't convert to `mkh5` either.
There are no known useful `**kwargs`. HDF5 chunking fails when the
size of datablock is smaller than the chunk and
compression makes files a little smaller and a lot slower
to read/write.
Nathaniel Smith did all the hard work of low level ERPSS file
IO.
Examples
.. todo::
Give examples or link to snippets or examples
"""
# clean up Path
eeg_f = str(eeg_f)
if log_f is not None:
log_f = str(log_f)
yhdr_f = str(yhdr_f)
(attr, data) = self._read_raw_log(eeg_f, log_f, with_log_events=with_log_events)
hio = mkh5.HeaderIO()
hio.new(attr, yhdr_f) # merge the .crw and yhdr into the new header
# create the group and write the header
with h5py.File(self.h5_fname, "r+") as h5:
try:
# create with data ... data.dtype is automatic
group = h5.create_group(h5_path)
except ValueError as fail:
print("mkh5 path {0}".format(h5_path))
raise fail
# write out the data into hdf5 datablocks+attributes
self._h5_update_eeg_data(self.h5_fname, h5_path, hio, data, *args, **kwargs)
# FIX ME
# self._check_data()
return None
# ------------------------------------------------------------
# add eeg data to a group under the same header
# ------------------------------------------------------------
[docs] def append_mkdata(
self, h5_path, eeg_f, log_f, yhdr_f, *args, with_log_events="aligned", **kwargs
):
"""Append .crw, .log, .yhdr to an existing h5_path
Extend an existing sequence of datablocks `h5_path/dblock_0`,
... `h5_path/dblock_N`, with the continuation,
`h5_path/dblock_N+1`, ...
The intended applicaton is to combine `.crw`, `.log` files
together that could or should be grouped, e.g., to add
separately recorded cals, recover from dig crashes, to pool an
individuals data recorded in different sessions.
Parameters
----------
h5_path : str
The full slashpath location in the `.h5` file where the
new data blocks will be stored in the hdf5 file. Must be
the full slashpath from the root without the leading
slash.
eeg_f : str
file path to the `.crw` files.
log_f : str or None
file path to corresponding `.log` file.
with_log_events : str
how to handle the log event codes, see `mkh5.create_mkdata()` for details
yhdr_f : string
path to the YAML header file.
Raises
-------
Warning
If the new crw headers do not match the existing
group attributes
See Also
---------
:meth:`~mkpy.mkh5.mkh5.create_mkdata`
"""
# clean up Path
eeg_f = str(eeg_f)
if log_f is not None:
log_f = str(log_f)
yhdr_f = str(yhdr_f)
# slurp crw/log
(crw_hdr, crw_data) = self._read_raw_log(
eeg_f, log_f, with_log_events=with_log_events
)
# build the new header
new_hio = mkh5.HeaderIO()
new_hio.new(crw_hdr, yhdr_f)
self._h5_update_eeg_data(
self.h5_fname, h5_path, new_hio, crw_data, *args, **kwargs
)
[docs] def delete_mkdata(self, sub_id):
"""delete a top-level group from the mkh5 h5 file without warning, see Notes about wasted space.
Parameters
sub_id (string) path to h5 group in the instance's h5file
Notes:
Use sparingly or not at all. hdf5 has no garbage collection,
deleting groups leaves holes in the file unless the entire
file tree is copied to a fresh file
FIX ME: hdf5 notes hack around no garbage collection is to
rewrite the gappy file to a new file ... this could be built
in here.
"""
with h5py.File(self.h5_fname, "r+") as h5:
# with data ... data.dtype is automatic
del h5[sub_id]
[docs] def get_dblock(self, h5_path, header=True, dblock=True):
"""return a copy of header dict and numpy ndarray from the mkh5
datablock at h5_path
Parameters
----------
h5_path : string
full HDF5 slashpath to a datablock in this mkh5 instance
header : bool {True}, optional
return the header
dblock : bool {True}, optional
return the dblock dataset
Returns
-------
hdr, data
Raises
------
ValueError if header and dblock are both False
"""
for key, val in [("header", header), ("dblock", dblock)]:
if not isinstance(val, bool):
ValueError(f"{key} must be True or False")
if header:
with h5py.File(self.h5_fname, "r") as h5:
hio = self.HeaderIO()
hio.get(h5[h5_path])
hdr = hio.header
if dblock:
with h5py.File(self.h5_fname, "r") as h5:
data = copy.deepcopy(h5[h5_path][...])
if header and dblock:
return hdr, data
elif header:
return hdr
elif dblock:
return data
else:
raise ValueError("header and dblock cannot both be False")
def _h5_get_dblock_slice(self, h5_f, h5_path, db_slice=None):
"""return a copy of header dict and numpy ndarray slice from the mkh5
datablock from h5 file at h5_path.
Parameters
----------
h5_f : string
path to mkh5 file
h5_path : string
full slashpath to a datablock in this mkh5 instance
db_slice : slice (default = None)
rows slice of the dblock to return, None returns entire dblock
Returns
-------
(hdr, dblock_slice) : dict, np.ndarray
entire dblock header dict for and the slice of dblock data
"""
with h5py.File(h5_f, "r") as h5:
# db_len = len(h5[h5_path].value)
db_len = len(h5[h5_path][...])
if db_slice is None:
db_slice = slice(0, db_len)
else:
if db_slice.start < 0:
msg = "data block slice start < 0"
raise IndexError(msg)
if db_slice.stop > db_len:
msg = "data block slice stop > data block length"
raise IndexError(msg)
hio = self.HeaderIO()
hio.get(h5[h5_path])
header = hio.header
dblock_slice = copy.deepcopy(h5[h5_path][db_slice])
return (header, dblock_slice)
# ------------------------------------------------------------
# PUBLIC utilities
# ------------------------------------------------------------
[docs] def headinfo(self, pattern=".+"):
"""print header information matching the pattern regular expression to STDOUT
Parameters
----------
pattern: regexp
regular expression to look for in the slashpaths to
datablocks and header information in this mkh5 format
file. Default '.+' matches all header info.
Assume we have previously constructed an mkh5 file `expts.h5`
containing data for two experiments and multiple subjects in
each and decorated with yaml header information about
electrode locations.
We may want to query and display more or less header
information. Usually less since there is lots.
Select the relevant information with regular expression
pattern matching:
.. code-block:: python
> expts = mkh5.mkh5('expts.h5') # initialize the mkh5 object
> expts.headinfo('Expt1/S001') # fetch all header info for Expt1/S001, all datablocks
> expts.headinfo('Expt1/.*MiPa') # returns everything in any Expt1 header involving MiPa
> expts.headinfo('Expt1/S001/apparatus/space/origin') # origin of sensor space Expt1/S003
> expts.headinfo('Expt1/S001/apparatus/sensors/MiPa/x') # x-coordinate of electrode MiPa
"""
info = self._get_head(pattern)
if info is not None:
for k, v in info:
print("{0}: {1}".format(k, v))
else:
warnings.warn("headinfo pattern {0} not found".format(pattern))
[docs] def sethead(self, slash_vals, **kwargs):
"""update mkh5 dblock header information via ``dpath.utils`` style
slash path, value notation.
The recommended method for adding information to mkh5 headers
is via the YAML header file loaded when converting .crw/.log
to mkh5
myh5file.create_mkdata('my.crw', 'my.log', 'my.yhdr')
Use sethead() at your own risk. mucking with headers by hand
is dangerous without a clear understanding of the mkh5 dataset
and header attribute format and dpath.util behavior.
Parameters
----------
slash_vals : (str, value) 2-ple or list of them
str is a slash path to an mkh5 dblock and on into the header
value is JSON-ifiable scalar, dict, or sequence
.. code-block:: python
mydat = mkh5.mkh5('myfile.h5')
mydat.sethead(('S01/dblock_0/long_subid', 'S0001_A')
# probably a bad idea to set this only for first datablock
mydat.sethead(('S01/dblock_0/npsych/mood_score', 4)
# use a list to set for all dblocks
spvs = [('S01/dblock_0/npsych/mood_score', 4),
('S01/dblock_1/npsych/mood_score', 4),
('S01/dblock_2/npsych/mood_score', 4),
('S01/dblock_3/npsych/mood_score', 4),
('S01/dblock_4/npsych/mood_score', 4),
('S01/dblock_5/npsych/mood_score', 4), ]
"""
# organize the input list of slash_vals by mkh5 datablock
by_dblock = dict()
for path, value in slash_vals:
m = re.match(r"(.+/dblock_\d+)/(.+)", path)
if m:
assert len(m.groups()) == 2
h5_dblock_path = m.groups()[0]
h5_header_path = m.groups()[1]
if h5_dblock_path not in by_dblock:
by_dblock[h5_dblock_path] = [] # first time thru
by_dblock[h5_dblock_path].append((h5_header_path, value))
# iterate by the dblocks found and update
with h5py.File(self.h5_fname, "r+") as h5:
for h5_dblock_path, hdr_slash_vals in by_dblock.items():
self._h5_update_header(h5[h5_dblock_path], hdr_slash_vals, **kwargs)
[docs] def gethead(self, pattern):
"""get header values as a list of (slashpath, value) 2-ples suitable for passing to edhead"""
return self._get_head(pattern)
# ------------------------------------------------------------
# introspection: headinfo and info
# ------------------------------------------------------------
def _get_head(self, pattern):
"""fetch all mkh5 dblock header info matching slashpath regexp pattern
Parameters:
pattern (regexp string) pattern to look for in the mkh5 headers
Return
------
matches : list
list of 2-ples (slashpath, value)
Example
mydat._get_head('S0/dblock_0/streams/(crw_ticks|MiPa)')
[
(S01/dblock_0/streams/crw_ticks/jdx, 1),
(S01/dblock_0/streams/crw_ticks/stream, 'new_crw_ticks'),
(S01/dblock_0/streams/crw_ticks/name, 'crw_ticks'),
(S01/dblock_0/streams/crw_ticks/dt, '<u4'),
(S01/dblock_0/streams/MiPa/jdx, 22),
(S01/dblock_0/streams/MiPa/stream, 'eeg0016'),
(S01/dblock_0/streams/MiPa/name, 'MiPa'),
(S01/dblock_0/streams/MiPa/dt, '<f2')
]
"""
re.compile(pattern)
h5_paths = h5tools.get_data_group_paths(self.h5_fname)
db_slashpaths = []
for path in h5_paths:
db_slashpaths.append(h5tools.get_dblock_paths(self.h5_fname, path))
matches = []
with h5py.File(self.h5_fname, "r") as h5:
for db_slashpath in db_slashpaths:
for dbs in db_slashpath:
hio = self.HeaderIO() # abundance of caution we are starting fresh
hio.get(h5[dbs])
hdr_paths = dpath.path.paths(hio.header, dirs=False, leaves=False)
for hdr_path in hdr_paths:
slash_path = "/".join([str(p[0]) for p in hdr_path])
full_path = dbs + "/" + slash_path
m = re.search(pattern, full_path)
if m:
matches.append(
(full_path, dpath.path.get(hio.header, hdr_path))
)
del hio
if len(matches) == 0:
return None
else:
return matches
[docs] def info(self):
"""return h5dump-ish overview of h5_path's groups/datasets/attributes and data
Parameter:
h5_path (string) h5 path to a group or dataset
Returns info (string)
"""
# headinfo = self.headinfo(**kwargs)
headinfo = self._get_head(".+")
h5_paths = np.unique(
[re.match(r".*dblock_\d+", x).group() for x in [y[0] for y in headinfo]]
)
info = ""
with h5py.File(self.h5_fname, "r") as h5:
for n in h5_paths:
info += "------------------------------------------------------------\n"
info += "{0} {1}\n".format(n, h5[n].__doc__.strip())
info += "------------------------------------------------------------\n"
if isinstance(h5[n], h5py.Dataset):
info += "datablock attributes:\n"
hdr_slash_vals = self._get_head(n)
db_headinfo = "\n".join(
["{0}: {1}".format(k, v) for k, v in hdr_slash_vals]
)
info += pprint.pformat(db_headinfo, indent=2, width=80)
info += "Data: {0}\n".format(h5[n][...].shape)
for col in h5[n].dtype.names:
info += " {0} {1}".format(col, h5[n].dtype[col].name)
info += " {0} .. {1}".format(h5[n][col][0:5], h5[n][col][-5:])
mqm = np.percentile(h5[n][col], [0, 25, 50, 75, 100])
info += " min-q-max: {0}\n".format(mqm)
elif isinstance(h5[n], h5py.Group):
pass
else:
raise ValueError("unknown h5py type: {0}".format(type(h5[n])))
info += "\n"
return info
[docs] def calibrate_mkdata(
self,
id_name,
cal_size=None,
polarity=None,
lo_cursor=None,
hi_cursor=30,
n_points=None,
cal_ccode=None,
use_cals=None,
use_file=None,
):
"""fetch and apply normerp style calibration to raw/crw dataset.
This locates two cursors, one on either side of a an
event-triggered calibration square wave step and measures
average values in the interval +/- n_points around the cursors
separately at each EEG data stream. The magnitude of the
difference is the measure of the step. The calibration scaling
factor for that stream is the average of the (trimmed)
calibration pulses.
Parameters
----------
id_name : str
h5 group name that is parent to mkpy format dblocks, id_name/dblocks
cal_size : float
magnitude of calibration square wave step in microvolts, e.g., 10
polarity : (1,0)
ignored, and should be deprecated. In ERPSS this inverts
all waveforms ... has nothing to do with calibration really.
lo_cursor: float (positive value)
magnitude of the low cursor offset from the calibration
event in milliseconds
hi_cursor: float (positive value)
magnitude of the high cursor offset from the calibration
event in milliseconds
n_points : uint
number of points on either side of each cursor to measure,
interval = 2*n_points + 1
cal_ccode : uint (default = 0)
search for cal pulses only in dblocks where the ccode
column == cal_ccode. The standing kutas lab convention is
cal ccode == 0
use_cals : str (None defaults to `id_name`)
slashpath to an alternate h5 group containing dblocks with the cal pulses
use_file : str (None defaults to `self.f_name`)
slashpath to an alternate mkpy format h5 data file.
1. Calibration pulses are often recorded into the same `.crw`
file or a separate file following data acquisition and then
mkh5.append()-ed to a group. In both cases, the cal pulses
appear in dblocks sister to the EEG data they are used to
calibrate.
Consequently the default calibration behavior is to search
the dblocks daughter to self.h5_fname/id_name for the cal
pulses.
In rare cases, cal pulses are missing entirely and must be
poached from another data group in the same hdf5 file or a
data group in a different hdf5 file.
Setting `use_cals` overrides the default group name.
Setting `use_file` overrides the default self.f_name.
2. The normerp way is to use the *ABSOLUTE VALUE* of the cal
step regardless of polarity to adjust the amplitude of the
+/- A/D recordings ... leaving the sign unchanged.
3. The polarity flag -1 is used *ONLY* to switch the sign of
the EEG and has nothing to do with the A/D scaling factor
"""
# FIX ME: FINISH INPUT ERROR CHECK
# FIX ME: this should throw future warning and be deprecated
if polarity == 1:
pass
elif polarity == -1:
warnings.warn(
"polarity == -1 ... this is not necessary for negative cals and probably wrong."
+ "Use it only to invert the polarity of the EEG recording for some bizzare reason like"
+ "amps configured backwards with A1 (+) and Cz (-) instead of the usual Cz (+) and A1(-)"
)
elif polarity is not None:
raise ValueError(
"polarity flag must be 1 to leave EEG polarity unchanged or -1 to switch the sign"
)
# where to look for cals?
if use_cals is None:
use_cals = id_name # default is daughter dblocks
if use_file is None:
use_file = self.h5_fname # default is same mkpy file
if None in [use_cals, use_file]:
msg = "Calibrating {0} using cals from {1}/{2}"
msg = msg.format(id_name, use_cals, use_file)
warnings.warn(msg)
# fetch a cal_info bundle from the use_cals group
# ... cal_pulses are for inspection/plotting
cal_info, cal_pulses = self._h5_get_calinfo(
use_file,
use_cals,
cal_size=cal_size,
lo_cursor=lo_cursor,
hi_cursor=hi_cursor,
n_points=n_points,
cal_ccode=cal_ccode,
)
# FIX ME .. .implement min_cal_count ???
with h5py.File(self.h5_fname, "r+") as h5:
# datablock refs
# n_dblocks = len([k for k in h5[id_name].keys() if 'dblock' in k])
# dblocks = [h5[id_name + '/' + 'dblock_'+ str(i)] for i in range(n_dblocks)]
dblock_paths = h5tools.get_dblock_paths(self.h5_fname, id_name)
for dblock_path in dblock_paths:
dblock = h5[dblock_path]
hio = self.HeaderIO()
hio.get(dblock)
is_calibrated = any(
[
"cal" in key
for stream in hio.header["streams"]
for key in hio.header["streams"][stream].keys()
]
)
if is_calibrated:
msg = (
"\n CAUTION skipping "
+ dblock_path
+ " ... appears to be already calibrated"
)
warnings.warn(msg)
continue
print(
"Calibrating block {0} of {1}: {2} ".format(
dblock.name, len(dblock_paths), dblock.shape
)
)
# get the names of colums with streams that string match 'dig_chan_' from attr metadata
hio = self.HeaderIO()
hio.get(dblock)
attrs = hio.header
strms = hio.header["streams"]
# fail if chan names don't match
if not cal_info.keys() == set(
[k for k, col in strms.items() if "dig_chan_" in col["source"]]
):
calchans = [c for c in cal_info.keys()]
print(
"Calibration datablock {0}: {1}".format(
use_cals, calchans.sort()
)
)
print(
"This data block {0}: {1}".format(
dblock.name, chan_names.sort()
)
)
raise ValueError(
"channel names in calibration data do not match data columns"
)
# walk the cal info and apply it to this data ...
# dt_uV = np.dtype(np.dtype([('uV',mkh5._mk_EEG)]))
cal_slash_vals = [] # list of new attributes
for chan, info in cal_info.items():
# print(' {0}'.format(chan), end='')
# dblock[chan] = dblock[chan]*(float(cal_size)/float(info['scale_by']))
# The normerp way ... scale by positive magnitude of cal step regardless
# of its sign.
scale_by = None
scale_by = float(info["scale_by"])
if scale_by < 0:
msg = (
"FYI found negative cal pulses {0} {1} ... "
"calibration will be OK".format(dblock_path, chan)
)
warnings.warn(msg)
scale_by = np.abs(scale_by)
# access by column *NAME*
dblock[chan] = dblock[chan] * (float(cal_size) / scale_by)
# This is pointless when the data are loaded into python
if polarity == -1:
dblock[chan] = -1.0 * dblock[chan]
# record this in the channel_metatdata
# chan_jdx = chan_jdxs[chan_names.index(chan)] # list version
# self._h5_set_dblock_attrs(dblock, streams=[(chan_jdx, 'calibrated', True),
# (chan_jdx, 'cals', info)])
# chan_jdx = strms[chan]['jdx'] # dict version
cal_slash_vals += [("streams/" + chan + "/calibrated", True)]
cal_slash_vals += [("streams/" + chan + "/cals", info)]
self._h5_update_header(dblock, cal_slash_vals)
# self._h5_update_header(dblock, [('streams/' + chan + '/calibrated', True),
# ('streams/' + chan + '/cals', info)]
def _attr_to_slashpath(self, k, v, p, l):
"""walk dictionary building a list of dpath friendly slashpaths, e.g., dblock.attr dicts"""
p = p + "/" + k # last key
if not isinstance(v, dict):
# l.append('{0}: {1}'.format(p, v))
l.append((p, v))
# print(leaf)
else:
for k1, v1 in v.items():
self._attr_to_slashpath(k1, v1, p, l)
return l
@property
def data_groups(self):
return h5tools.get_data_group_paths(self.h5_fname)
@property
def dblock_paths(self):
"""an iterable list of HDF5 paths to all the data blocks in the mkh5 file"""
dblock_paths = []
h5_paths = self.data_groups
for h5_path in h5_paths:
dblock_paths += h5tools.get_dblock_paths(self.h5_fname, h5_path)
return dblock_paths
@property
def data_blocks(self):
"""deprecated use mkh5.dblock_paths for a list of HDF5 paths to all the data blocks"""
msg = (
"mkh5.data_blocks is deprecated and will be removed in a future "
"release, use mkh5.dblock_paths instead"
)
warnings.warn(msg, FutureWarning)
return self.dblock_paths
@property
def epochs_names(self):
return self.get_epochs_table_names()
#
def _load_eeg(self, eeg_f):
"""kutaslab .crw or .raw data loader, also populates self.dig_header
Similar to MATLAB erpio2
Parameters
----------
eeg_f : str
path to .crw or .raw data file
Returns
-------
channel_names, raw_evcodes, record_counts, eeg, dig_header : tuple
channel_names : list of str
labels of EEG data channels in order from the .crw/.raw data header
raw_evcodes : 1-D array of int
event codes from mark track in each record
record_counts : int
number of 256-sample A/D data records as read from eeg_f
eeg : np.ndarray (shape = (record_counts * 256, 1 + len(channel_names)
array of A/D values knit together from the data records of
256 samples each, for the event mark track and channels
dig_header : dict
key: values from the .crw/.raw header + some extra metadata
* Pausing makes kutaslab "continuous" EEG data gappy. However,
since the dig clock ticks stop during a pause, ticks are
not sample counts (at a fixed rate) not real-time clock ticks.
"""
# slurp low level NJS data
with open(eeg_f, "rb") as fr:
(
channel_names,
raw_evcodes,
record_counts,
eeg,
dig_header,
) = mkio.read_raw(fr, dtype="int16")
return channel_names, raw_evcodes, record_counts, eeg, dig_header
def _check_mkh5(self):
"""error check structure and contents of self.eeg"""
raise NotImplemented("FIX ME: this code predates from mkh5 data format")
# ------------------------------------------------------------
# eeg
# ------------------------------------------------------------
if self.eeg is None:
# fugedaboutit
raise (ValueError("no eeg data"))
# FIX ME check the data has the right shape
# check for data errors in crw/raw
max_chunk = max(self.dig_chunks["dig_chunks"])
if not all(
self.dig_chunks["dig_chunks"] == np.repeat(range(max_chunk + 1), 256)
):
errmsg = (
"{0} eeg dig records are not sequential, file may be corrupted: {1}"
)
errmsg = errmsg.format(self.dig_header["eegfile"], self.dig_chunks)
warnings.warn(errmsg, DigRecordsNotSequential)
# ------------------------------------------------------------
# dig_header
# ------------------------------------------------------------
important_keys = [
"subdesc",
"samplerate",
"expdesc",
"nchans",
"magic",
"recordsize",
"odelay",
"recordduration",
"dig_file_md5",
"dig_file",
]
# FIX ME: check the important keys have sensible values
# fail on missing important keys and values
for k in important_keys:
if not k in self.dig_header.keys():
raise (ValueError("dig_header key {0} not found".format(k)))
if self._get_dig_header(k) is None or self._get_dig_header(k) == "":
raise (ValueError("dig_header {0} value not found".format(k)))
# ------------------------------------------------------------
# location information if present, must be well behaved
# ------------------------------------------------------------
# if 'yhdr' in vars(self):
# # fail on duplicate electrode labels
# ll = [x.label for x in self.yhdr['sensors']]
# if not all(np.sort(ll) == np.sort(np.unique(ll))):
# raise(DuplicateLocationLabelError)
# FIX ME: fail on missing location info any channel
def _h5_update_header(self, h5_dblock, slash_vals, **kwargs):
"""
add/overwrite dblock header info via dpath slashpath accessors
Parameters
----------
h5_dblock : (reference to h5py dataset in open, writeable mkh5 file)
slash_vals a 2-pl (slash_path, value) or list of them, each settable by dpath.util.set
kwargs ... keywords past to dpath.util.set
Ex. _h5_set_dblock_attr(h5dblock, (S01/dblock_0/samplerate, 250))
Ex. _h5_set_dblock_attr(h5dblock, [ (S01/dblock_0/streams/lle/calibrated, True),
(S01/dblock_0/streams/lhz/calibrated, True) ])
"""
# promote singleton tuple to a list
if not isinstance(slash_vals, list):
slash_vals = [slash_vals]
hio = mkh5.HeaderIO()
hio.get(h5_dblock)
hio._update_from_slashpaths(slash_vals)
hio.set(h5_dblock) # push header back into the dblock
# ------------------------------------------------------------
# dimension conversions samples <--> ms
# ------------------------------------------------------------
def _ms2samp(ms, srate):
"""convert (non-negative) ms intervals to samples"""
period = 1000.0 / srate
samp = np.int64(int(ms / period))
return samp
def _samp2ms(samp, srate):
"""convert n samples to ms at sample rate"""
period = 1000.0 / srate # in ms
ms = np.float32(samp * period)
return ms
def _get_dblock_slices_at(
anchors, n_before, n_duration, min_samp=None, max_samp=None
):
"""returns an array of slice tuples (start_sample, anchor, stop_sample)
Parameters
----------
anchors : nd.array, shape (n, )
non-negative sample counters, e.g., 0, 27, 30004
n_before: uint
the number of samples before the anchor (positive integer)
n_duration : uint
epoch slice length in samples (positive integer)
Returns
-------
epochs : numpy.ndarray, dtype=mkh5._dblock_slicer_dtype
each dblock_slicer is a tuple of sample indices
(start_idx, anchor_idx, stop_idx)
This just does the sample math the sample math, minimal bounds checking
"""
start_ticks = anchors - n_before # *subtract* positive numbers
stop_ticks = start_ticks + n_duration - 1
epochs = np.array(
[i for i in zip(start_ticks, anchors, stop_ticks)],
dtype=mkh5._dblock_slicer_dtype,
)
# check bounds
if min_samp is None:
min_samp = np.iinfo(mkh5._epoch_t).min
if any(epochs["start_samps"] < min_samp):
msg = (
"epoch starting sample is less than min_samp {0} "
"... bad anchors or presampling?"
).format(min_samp)
raise ValueError(msg)
if max_samp is None:
max_samp = np.iinfo(mkh5._epoch_t).max
if any(epochs["stop_samps"] > max_samp):
msg = (
"epoch starting sample is greater than than {0} "
"... bad anchors or duration?"
).format(min_samp)
raise ValueError(msg)
return epochs
def _get_dblock_slicer_from_eventstream(self, event_stream, presamp, duration):
"""returns np.array of _dblock_slicer_dtype for non-zero events in event_stream
Parameters
----------
event_stream : 1-D array
mostly 0's where non-zero values are taken to be event codes
presamp : uin64
number of samples before the anchor event (positive number)
duration : uint64
number of samples in the slice
Returns
-------
slices : array of slices, slice fails: array of slices
Designed for use with dblock_N['log_evcodes'] data streams
though will work with arbitrary 1-D array "event_streams".
This allows programmatic construction of event stream.
"""
# find non-zero event codes in the event_stream
event_ticks = (event_stream > 0).nonzero()[0]
slices = mkh5._get_dblock_slices_at(event_ticks, presamp, duration)
inbounds = (0 <= slices["start_samps"]) & (
slices["stop_samps"] < len(event_stream)
)
slice_fails = slices[inbounds == False] # slice out bad
slices = slices[inbounds == True] # and drop them
return (slices, slice_fails)
# ------------------------------------------------------------
# Model: data handling
# ------------------------------------------------------------
def _read_raw_log(self, eeg_f, log_f, with_log_events="aligned"):
"""NJS crw/log slurpers plus TPU decorations and log wrangling.
Parameters
----------
eeg_f : str
path to ERPSS .crw or .raw file
log_f : str or None
path to ERPSS .log file, if any.
with_log_events : str ("aligned", "from_eeg", "none", "as_is" )
`aligned` (default) ensures eeg log event code timestamps
are 1-1 but allows values, e.g, logpoked event codes with
a warning. Requires log file. This is mkpy.mkh5 <= 0.2.2
behavior.
"from_eeg" propagates the eeg event codes (dig mark track)
to the log_evcodes column. Not allowed with a log filename.
`none` sets `log_evcodes`, `ccode`, `log_flags` all to 0.
Not allowed with a log filename.
`as_is` loads whatever codes are in the log file without
checking against the eeg data. Requires a log file. Allows
silent eeg and log event code misalignment. Excedingly
dangerous but useful for disaster recovery.
Returns
-------
log and raw data merged into a 2-D numpy structured array
Notes
-----
`with_log_events` added in 0.2.3 to allow loading `.crw`
files with badly mismatching and/or missing `.log` files. The
change is backwards compatible, the default option
`with_log_events='aligned'` is the same behavior as for
`mkpy.mkh5` <= 0.2.2
Handle logs with care. The crw is the recorded data log of
eeg data and digital events (marktrack). The log file
contains events only. Event discrepancies between the crw
and log can also arise from data logging errors in the crw
or log or both but these are rare. Discrepancies can also be
introduced deliberately by logpoking the log to revise event
codes. Also binary log files can be (mis)constructed from
text files but there is no known instance of this in the
past 20 years. Discrepancies involving the digital sample
(tick) on which the event occurs are pathological, something
in the A/D recording failed. Discrepancies where sample
ticks align but event code values differ typically indicate
logpoking but there is no way to determine programmatically
because the logpoked values can be anything for any reason.
EEG data underruns. This can happen when dig fails and doesn't
flush a data buffer. Rare but it happens. The eeg + log data
array columns have to be the same length, the choice is to pad
the EEG data or drop the trailing log codes. Neither is good,
the log events may have useful info ... stim and response
codes. But these are available in other ways. For EEG data
analysis, it is worse to pretend EEG data was recorded when it
wasn't.
Spurious event code 0 in the log. The crw marktrack 0 value is
reserved for "no event" so a event code 0 in the log is a
contradiction in terms. It is a fixable data error if the eeg
marktrack at the corresponding tick is zero. If not, it is an
eeg v. log code mismatch and left as such. There is one known
case of the former and none of the latter.
Each dig record contains 256 sweeps == samples == ticks. Within a
record, each sweep is a clock tick by definition. The records in
sequence comprise all the sweeps and provides.
.. TO DO: some day patch mkio to yield rather than copy eeg
"""
# modicum of guarding
log_options = ["aligned", "as_is", "from_eeg", "none"]
if not with_log_events in log_options:
msg = f"with_log_events must be one of these: {' '.join(log_options)}"
raise ValueError(msg)
if log_f is None and with_log_events in ["aligned", "as_is"]:
msg = f"with_log_events={with_log_events} requires a log file: log_f"
raise ValueError(msg)
if log_f is not None and with_log_events in ["from_eeg", "none"]:
msg = f"to use with_log_events={with_log_events}, set log_f=None"
raise ValueError(msg)
# ------------------------------------------------------------
# eeg data are read "as is"
# ------------------------------------------------------------
with open(eeg_f, "rb") as fr:
(
channel_names,
raw_evcodes,
record_counts,
eeg,
dig_header,
) = mkio.read_raw(fr, dtype="int16")
assert len(raw_evcodes) == eeg.shape[0], "bug, please report"
n_ticks = len(raw_evcodes)
raw_event_ticks = np.where(raw_evcodes != 0)[0]
raw_events = raw_evcodes[raw_event_ticks]
n_raw_events = len(raw_events)
# ------------------------------------------------------------
# log data may get tidied
# log_data[evcode, crw_tick, ccode, log_flag]
# ------------------------------------------------------------
log_data = None
if log_f is not None:
with open(log_f, "rb") as fid:
log_data = np.array([row for row in mkio.read_log(fid)])
# log event code ticks are not allowed to exceed EEG data ticks
# under any circumstances. This can happen when dig crashes
is_trailer = log_data[:, 1] > raw_event_ticks[-1]
if any(is_trailer):
oob_events = log_data[is_trailer]
msg = (
f"{eeg_f} eeg data underrun {log_f}, "
f"dropping trailing log events {oob_events}"
)
warnings.warn(msg, LogRawEventCodeMismatch)
log_data = log_data[~is_trailer]
if with_log_events == "aligned":
# aligned (default) is mkpy.mkh5 <= 0.2.2 legacy behavior
assert log_data is not None, "bug, please report"
# log zero codes are spurious iff the eeg marktrack at that tick is also 0
is_spurious_zero = (log_data[:, 0] == 0) & (
raw_evcodes[log_data[:, 1]] == 0
)
if any(is_spurious_zero):
msg = f"dropping spurious 0 event code(s) in {log_f}: {log_data[is_spurious_zero]}"
warnings.warn(msg)
assert all(
log_data[is_spurious_zero][:, 0] == 0
), "this is a bug, please report"
log_data = log_data[~is_spurious_zero]
# fail if eeg and log event code ticks differ
if not np.array_equal(log_data[:, 1], raw_event_ticks):
align_msg = "eeg and log event code ticks do not align"
raise RuntimeError(align_msg)
# ticks are aligned if we get here, but warn of different values, e.g., logpoked
mismatch_events = log_data[log_data[:, 0] != raw_events]
if len(mismatch_events) > 0:
mismatch_msg = (
"These log event codes differ from the EEG codes, "
f"make sure you know why\n{mismatch_events}"
)
warnings.warn(mismatch_msg, LogRawEventCodeMismatch)
elif with_log_events == "as_is":
assert log_data is not None, "bug, please report"
warnings.warn(
f"not checking for event code mismatches in {eeg_f} and {log_f}"
)
elif with_log_events == "from_eeg":
assert log_data is None, "bug, please report"
warnings.warn(
f"setting all log_evcodes to match EEG event codes codes in {eeg_f} "
)
log_data = np.array(
[(raw_evcodes[tick], tick, 0, 0) for tick in raw_event_ticks]
)
elif with_log_events == "none":
assert log_data is None, "bug, please report"
warnings.warn(f"setting all log_evcodes, log_ccodes, log_flags to 0")
log_data = np.zeros(
(len(raw_events), 4), dtype="i4"
) # evcode, tick, ccode, log_flag
else:
raise ValueError(f"bad parameter value: with_log_events={with_log_events}")
# ------------------------------------------------------------
# construct output structured array
# ------------------------------------------------------------
# tick and log info stream dtypes
dt_names = [
"dblock_ticks",
"crw_ticks",
"raw_evcodes",
"log_evcodes",
"log_ccodes",
"log_flags",
"pygarv",
]
dt_formats = [
mkh5._evtick,
mkh5._evtick,
mkh5._evcode,
mkh5._evcode,
mkh5._log_ccode,
mkh5._log_flag,
mkh5._pygarv,
]
dt_titles = ["t_{0}".format(n) for n in dt_names]
# eeg stream dtypes
dt_names.extend([c.decode("utf8") for c in channel_names])
dt_formats.extend(np.repeat(mkh5._mk_EEG, len(channel_names)))
dt_titles.extend(
["dig_chan_{0:04d}".format(n[0]) for n in enumerate(channel_names)]
)
# mkh5 dblock dtype
dt_data = np.dtype(
{"names": dt_names, "formats": dt_formats, "titles": dt_titles}
)
# load eeg streams and build the crw_tick index
data = np.zeros((len(raw_evcodes),), dtype=dt_data)
data["crw_ticks"] = np.arange(n_ticks)
# data['crw_ticks'] = np.arange(len(raw_evcodes))
data["raw_evcodes"] = raw_evcodes
# load the .log streams
for (code, tick, condition, flag) in log_data:
data["log_evcodes"][tick] = code
data["log_ccodes"][tick] = condition
data["log_flags"][tick] = flag
# load eeg stream data
for c, ch_name in enumerate(channel_names):
data[ch_name.decode("utf8")] = np.array(eeg[:, c], dtype=mkh5._mk_EEG)
# capture the new numpy metadata for variable columns
# as a sequence to preserve column order
# dblock_cols = [] # list version
dblock_cols = dict() # dict version
for col_jdx, col_desc in enumerate(dt_data.descr):
col_dict = None
col_dict = {
"jdx": col_jdx,
"source": col_desc[0][0],
"name": col_desc[0][1],
"dt": col_desc[1],
# "calibrated": False, # still raw A/D
# "cals": dict() #
}
# dblock_cols.append(col_dict) # list version
dblock_cols.update({col_desc[0][1]: col_dict}) # dict version
# FIX ME ... check we don't clobber dig_head info??
attr = {"streams": dblock_cols}
# patch np dtypes for jsonification ... ugh
for k, v in dig_header.items():
if isinstance(v, np.string_):
attr[k] = str(v.decode("utf8"))
elif isinstance(v, np.uint16) or isinstance(v, np.int16):
attr[k] = int(v)
else:
attr[k] = v
# decorate constant columns with more useful info
stat_result = Path(eeg_f).stat()
eeg_file_stat = dict(
[(st, getattr(stat_result, st)) for st in dir(stat_result) if "st_" in st]
)
attr["eeg_file"] = eeg_f
attr["eeg_file_stat"] = eeg_file_stat
attr["log_file"] = log_f if log_f is not None else "None"
attr["uuid"] = str(uuid.uuid4())
for k, fname in {"eeg_file_md5": eeg_f, "log_file_md5": log_f}.items():
if fname is not None:
with open(fname, "rb") as f:
attr[k] = hashlib.md5(f.read()).hexdigest()
else:
attr[k] = "None"
# New version returns the header and stuff as a dict
# jsonification occurs in dblock CRUD
return (attr, data)
def _h5_get_slices_from_datablock(dblock, slicer):
"""minimal mkh5 datablock epochs slicer
Parameters
----------
dblock : h5py.Dataset
an open, readable mkh5 datablock, dblock_N
slicer : numpy.ndarray, dtype=dtype _evticks
i.e., tuples (start_samps, anchor_samps, stop_samps)
Returns
-------
a copy of the data
"""
epochs_data = []
for e in slicer:
# dblocks are sample rows down x data columns across
# slicing here is a *row* slice, exactly what we want
epochs_data.append(dblock[e["start_samps"] : e["stop_samps"]])
# stack the arrays so access by name, e.g., MiPa returns a
# subarray with sample rows down x epoch columns accross
return np.stack(epochs_data, axis=1)
def _h5_get_calinfo(
self,
h5f,
group_name,
n_points=None,
cal_size=None,
lo_cursor=None,
hi_cursor=None,
cal_ccode=None,
):
"""scan mkh5 datablocks under h5_path for cals and return channel-wise AD scaling info
Parameters
----------
h5f : str
hdf5 file w/ conforming mkh5 data)group/dblock_N structure
group_name : str
mkh5 data_group to search for cal events and pulses
n_points : uint
number of samples average on either side of cursor
cal_size : float, units of microvolts
size of calibration pulse
lo_cursor : float, units of milliseconds
center of the pre-pulse window to average in computing the cal pulse amplitude
hi_cursor : float, units of milliseconds
center of the post-pulse window to average in computing the cal pulse amplitude
cal_ccode : uint
log condition code designated for calibration pulses, typically 0 by convention
Returns
-------
dictionary including 'scale_by' factor and summary stats on the cals = (None,None) on fail
Raises
------
ValueError if missing kwargs
IOError if no kutaslab cals found in any of the group_name dblocks
Return values coerced to float b.c. json can't serialze float16 .. cmon
Unopinionated about what h5 group to search and works on any dblock with cals
so it can be called on cals recorded with the EEG or in separate files.
Differences from normerp:
- the pp_uV resolution argument is not used
- there is no polarity parameter which has little to no point for data analysis
- cal_cc (for condition code) is used instead of cal_bin
since the single trial eeg and log files have no bins
"""
# ------------------------------------------------------------
# input check ...
# ------------------------------------------------------------
must_have = [n_points, cal_size, lo_cursor, hi_cursor, cal_ccode]
if None in must_have:
raise ValueError(
"missing keyword arguments ... check for "
+ "n_points, cal_size, lo_cursor "
+ "hi_cursor, cal_ccode"
)
# ------------------------------------------------------------
# scan the group_name datablocks for cals ...
# . cal epoch stacks are snippets of samples surrounding a pulse
# . there can be 1+ if cals are found in different datablocks
# ------------------------------------------------------------
with h5py.File(h5f, "r") as h5:
# walk the group and gather up datablocks
group = h5[group_name]
dblock_sets = [
g
for g in group.keys()
if "dblock" in g and isinstance(group[g], h5py.Dataset)
]
if len(dblock_sets) == 0:
raise Mkh5FormatError(
"datablocks not found in {0}".format(group_name)
+ " check the id and/or load data"
)
# holds the calibration epoch stacks across datablocks, normally len == 1
cal_stacks = []
cal_dblock_ids = [] # for reporting only
srate = nchans = None
hio = self.HeaderIO()
for gn in dblock_sets:
g, attrs, consts, strms = None, None, None, None
g = group[gn]
# json -> dict
# attrs = json.loads(g.attrs['json_attrs']) # deprecated
# old_strms = attrs['streams'] # deprecated
hio.get(g) # fetch the dblock header for access
strms = hio.header["streams"]
# assert old_strms == strms
# scan for calibration events using log_evcodes in case
# bad cals have been manually logpoked
# if any((g['log_ccodes'] == cal_ccode) & (g['log_evcodes'] < 0)):
neg_cal_events = g["log_evcodes"][
np.where((g["log_ccodes"] == cal_ccode) & (g["log_evcodes"] < 0))
]
if any(neg_cal_events):
msg = (
"negative event code(s) found for cal condition code "
+ str(cal_ccode)
+ " "
)
msg += " ".join([str(x) for x in neg_cal_events])
warnings.warn(msg)
cal_event_ptrs = (
(g["log_evcodes"] > 0) & (g["log_ccodes"] == cal_ccode)
).nonzero()[0]
# winner winner chicken dinner ... but horribly procedural
if len(cal_event_ptrs) > 0:
# we have a cal block
print("Found cals in {0}".format(g.name))
cal_dblock_ids.append("{0}{1}".format(h5f, g.name)) # log it
# FIX ME: this is rude, we already knew this at dblock_0
# but this way it reports where the cals were found
# is_calibrated = any(['cals' in col.keys() for col in strms if 'dig_chan_' in col['source'] ])
# is_calibrated = any([v for k,v in strms.items() if k == 'calibrated'])
is_calibrated = any(
[
"cals" in col.keys()
for k, col in strms.items()
if "dig_chan_" in col["source"]
]
)
if is_calibrated:
msg = (
group_name
+ "/"
+ gn
+ " cannot access calibration pulses, they have already been scaled to uV"
)
raise RuntimeError(msg)
# check sample rate and channels against the previous block (if any)
if not (srate is None) and hio.header["samplerate"] != srate:
raise ValueError(
"srate in block {0}: {1}".format(
gn, hio.header["samplerate"]
)
+ "does not match previous data block: {0}".format(srate)
)
if not (nchans is None) and hio.header["nchans"] != nchans:
raise ValueError(
"number of channels in block {0}: {1}".format(
gn, hio.header["nchans"]
)
+ "does not match previous data block: {0}".format(nchans)
)
# set params this datablock ...
srate = hio.header["samplerate"]
nchans = hio.header["nchans"]
# humans use ms ...
lcs = mkh5._ms2samp(
lo_cursor, srate
) # in negative samples b.c. of normerp incantations
hcs = mkh5._ms2samp(hi_cursor, srate)
# computers use samples ...
presamp = (
-1.0 * lcs
) + n_points # presampling is a magnitude (positive) for epoch calc
duration = hcs + n_points + presamp + 1
# returns an nd.array, access by data column name
# return subarray: duration samples x epochs
# (cal_slicer, fails) = self._get_dblock_slicer_from_eventstream(g['raw_evcodes'], presamp, duration)
(cal_slicer, fails) = self._get_dblock_slicer_from_eventstream(
g["log_evcodes"], presamp, duration
)
if len(fails) > 0:
warnings.warn(
"dropping {0} cal epochs out of data bounds".format(
len(fails)
)
)
cal_stacks.append(mkh5._h5_get_slices_from_datablock(g, cal_slicer))
# typically cals in one dblock tho no harm in more if they are good
if len(cal_stacks) < 1:
raise (
RuntimeError(
"cal events not found in file {0} ".format(h5f)
+ "for {0} dblocks and log_ccode {1}".format(
group_name, cal_ccode
)
)
)
if len(cal_stacks) > 1:
warnings.warn(
"found {0} datablocks with cal events and using all of them".format(
len(cal_stacks)
)
+ "if this is unexpected check the crw/log and cal files"
)
# combine cal events from the dblocks into one stack,
# samples down, events across, each element in sub array
# is a row of dblock
cal_set = np.hstack(cal_stacks)
# pull the datablock column info down from the attrs/header
# colmdat = json.loads(g.attrs['streams']) # deprecated
# chan_names = [c['name'] for c in strms if 'dig_chan_' in c['source']] # list version
chan_names = [
c["name"] for k, c in strms.items() if "dig_chan_" in c["source"]
] # dict version
cal_factors = dict()
for c, n in enumerate(chan_names):
# ------------------------------------------------------------
# auto trim the data FIX ME ... parameterize configurable someday
# ------------------------------------------------------------
q75, median, q25, iqr, = (
None,
None,
None,
None,
)
delta_min, delta_max = None, None
deltas, good = None, None
# deltas are cal pulse step size (n, ) for n channels
# float16 version
# deltas = (cal_set[n][-(1+n_points*2):,:] - \
# cal_set[n][:(1+n_points*2),:]).mean(axis=0)
# float64 version ... scaling 12-bit AD x 1e5 is safe for float64
deltas = (
cal_set[n][-(1 + n_points * 2) :, :]
- cal_set[n][: (1 + n_points * 2), :]
)
deltas_e5 = 1e5 * np.array(deltas, dtype="<f8")
deltas = 1e-5 * deltas_e5.mean(axis=0)
# trim the deltas
q75, median, q25 = np.percentile(deltas, [75, 50, 25])
iqr = q75 - q25
delta_min = median - (1.5 * iqr)
delta_max = median + (1.5 * iqr)
good = deltas[(delta_min < deltas) & (deltas < delta_max)]
trimmed_idxs = np.where((deltas <= delta_min) | (deltas >= delta_max))[
0
]
assert len(deltas) == len(good) + len(trimmed_idxs)
if len(good) == 0:
msg = (
"uh oh ... {0} channel {1}:{2} has no cal pulses "
"after trimming at median +/- 1.5IQR"
).format(self.h5_fname, c, n)
raise ValueError(msg)
if len(good) < 0.5 * len(deltas):
msg = (
"{0} channel {1}:{2} ... less than half the cal pulses remain "
"after trimming at median +/- 1.5IQR. "
"Do you know why?"
).format(self.h5_fname, c, n)
warnings.warn(msg)
# numpy may leave some as float16 if it can ... makes
# json serializer unhappy
cal_args = dict(
{
"n_points": int(n_points),
"cal_size": float(cal_size),
"lo_cursor": float(lo_cursor),
"hi_cursor": float(hi_cursor),
"cal_ccode": int(cal_ccode),
}
)
cal_factors.update(
{
n: {
"cal_srate": float(srate),
"cal_dblock": cal_dblock_ids,
"cal_args": cal_args,
"scale_by": float(good.mean()),
"var": float(good.var()),
"median": float(median),
"iqr": float(iqr),
"n_cals": int(len(good)),
"n_trimmed": len(trimmed_idxs),
"trimmed_idxs": [int(idx) for idx in trimmed_idxs],
}
}
)
return (cal_factors, cal_set)
# ------------------------------------------------------------
# end fetch calibration information
# ------------------------------------------------------------
# ------------------------------------------------------------
# View
# ------------------------------------------------------------
[docs] def plotcals(self, *args, **kwargs):
"""visualize cal pulses and scaling factors used to convert to
microvolts
"""
calinfo, cal_stack, = (
None,
None,
)
print("Plotting cals")
calinfo, cal_stack = self._h5_get_calinfo(*args, **kwargs)
if calinfo is None or len(calinfo.keys()) == 0:
raise Mkh5CalError(
"no cals found in " + " ".join([str(arg) for arg in args])
)
# FIX ME ... legacy vars
h5f = args[0]
group_name = args[1]
# ------------------------------------------------------------
# Cal snippets at each channel
# ------------------------------------------------------------
ch_names = [x for x in calinfo.keys()]
nchans = len(ch_names)
n_col = 4
f, ax = plt.subplots(
np.ceil(nchans / float(n_col)).astype("int64"),
n_col,
figsize=(12, 8),
facecolor="k",
)
calcolors = ["y", "m"]
# 'hi_cursor': 50.0, 'cal_size': 10.0,
# 'cal_ccode': 0, 'lo_cursor': -50.0, 'n_points': 3}
# for c in range(nchans):
for c, ch in enumerate(ch_names):
n_cals = cal_stack[ch].shape[1] # should be the same across channels
cinf = calinfo[ch]
cal_args = cinf["cal_args"]
# srate = cinf["cal_srate"]
scale_by = cinf["scale_by"]
n_points = cal_args["n_points"] # because of odd normerp variable names
trimmed_idxs = cinf["trimmed_idxs"]
good_idxs = list(set(range(n_cals)).difference(trimmed_idxs))
assert n_cals == len(good_idxs) + len(trimmed_idxs)
lo_cursor = n_points
hi_cursor = len(cal_stack[ch]) - (n_points + 1)
lo_span = range(lo_cursor - n_points, lo_cursor + n_points + 1)
hi_span = range(hi_cursor - n_points, hi_cursor + n_points + 1)
# relative to event @ sample 0
cal_samps = range(
lo_cursor - n_points, lo_cursor - n_points + len(cal_stack[ch])
)
a = ax[int(c / n_col), c % n_col]
a.set_facecolor("k")
a.set_title(
f"scale={scale_by:0.2f}, trimmed {len(trimmed_idxs)}",
fontsize="small",
color="lightgray",
)
a.set_ylabel(ch, color="lightgray", rotation=0, horizontalalignment="left")
# box the points averaged
a.axvspan(lo_span[0], lo_span[-1], color="c", alpha=0.5) # ymax=0.5,
a.axvspan(hi_span[0], hi_span[-1], color="c", alpha=0.5) # ymax=0.5,
# mark the cursors
a.axvline(lo_cursor, color="r")
a.axvline(hi_cursor, color="r")
# lpc = a.plot(cal_samps, cal_stack[ch], ".-", color=calcolors[c % 2])
a.plot(cal_samps, cal_stack[ch][:, good_idxs], ".-", color=calcolors[c % 2])
if len(trimmed_idxs) > 0:
a.plot(
cal_samps, cal_stack[ch][:, trimmed_idxs], ".-", color="red", lw=0.5
)
a.plot(
cal_samps, cal_stack[ch][:, good_idxs].mean(axis=1), color="cyan", lw=2
)
f.set_facecolor("black")
st = (
f"calibration pulses (recorded N={n_cals}) from: "
f"{' '.join([str(arg) for arg in args])}\n"
f"{' '.join([k + '=' + str(v) for k, v in kwargs.items()])}"
)
f.suptitle(st, color="lightgray")
f.subplots_adjust(hspace=0.75)
# plt.show() # uncomment to preview for debugging
return (f, ax)
# TPU
[docs]class LocDat:
"""map Kutas lab spherical coordinates and Brainsight
.elp data files to 3-D cartesian XYZ
Coordinates
LocDat native positions are in Cartesian 3-space
Origin is center of head
Orientation is RAS: X+ = Right, Y+ = Anterior, Z+ = Superior
Cartesian coordinates come in as triples: x, y, z
Polar coordinates come in as triples: radius, theta, z
Kutaslab
Kutaslab topo coordinates are spherical come in as radius, theta, phi
triples (see topofiles for theta, phi) and get mapped to x,y,z
* origin is between the ears (co-planar with 10-20 temporal line)
* vectors from the origin at angles (degrees)
* theta = 0 points toward right ear, along interaural line,
90 points to forehead along midline
* phi = 0 points to the vertex, 90 points to the temporal line
"""
def __init__(self, type, label, coord, pos, distance_units=None, angle_units=None):
"""initialize LocDat
Parameters
----------
type: str (electrode, fiducial, scalp, ...)
label: str (lle, MiPa, Fz, Nasion, scalp37, ...)
coord: str (cartesian | polar)
pos: array of float, [x, y z] | radius, theta, phi)
See details
distance_units: str (cm | inch)
angle_units: str (deg | rad)
"""
# check arguments ... sort of
assert len(pos) == 3
assert distance_units
# capture positional arguments
self.type = type
self.label = label
self.coord = coord
self.distance_units = distance_units
self.angle_units = angle_units
# capture cartesian option
if coord == "cartesian":
self.x = self.y = self.z = None
self.x = pos[0]
self.y = pos[1]
self.z = pos[2]
if coord == "polar":
# insist on sensible angle_units for polar coordinates
assert angle_units == "degrees" or angle_units == "radians"
self.radius = self.theta = self.phi = None
self.radius = pos[0]
self.phi = pos[1]
self.theta = pos[2]
if self.angle_units == "degrees":
toradians = (2.0 * np.pi) / 360.0 # conversion factor
self.theta_in_radians = self.theta * toradians
self.phi_in_radians = self.phi * toradians
else:
self.theta_in_radians = self.theta
self.phi_in_radians = self.phi
# p projects onto x-y given phi,
# so p == 1.0 at phi == 90 and < 1 otherwise
p = self.radius * np.sin(self.phi_in_radians)
self.x = p * np.cos(self.theta_in_radians)
self.y = p * np.sin(self.theta_in_radians)
self.z = self.radius * np.cos(self.phi_in_radians)