import re
import yaml
import numpy as np
import pandas as pd
import warnings
[docs]class CodeTagger:
"""Tag pattern-matched sequences of time-indexed integer with key:value metadata.
In the intended use case
* the integer values are event-markers recorded on an event or
timing track of a digital data acquisition system.
* Each integer maps to a specific event or event-type, e.g., a
stimulus onset/offset, a response, a timing mark.
* A sequence of integers corresponds to a sequence of events or
event types.
* The metadata (tabular rows x columns) add further information
above and beyond the numeric value of the integer and may
include are mapped to tuples of mixed data types: strings,
integers, floats, suitable for decorating events and epochs of
data with useful information such experimental design factor
levels (Easy, Hard), continuous covariates (age of acquisition).
The original use case was to find and decorate ERPSS log event
codes with experimental information.
The mechanism here is general, abstracting away from the source of
the integer 1-D arrays and the intended purpose of the metadata.
.. note::
Starting with mkpy v0.2.1 codemaps allow but do not require
an `Index`.
Notes
-----
The UI for specifying a code tag map can be any of these file types
Excel
an .xlsx file and (optional) named worksheet readable by
pandas.DataFrame.read_excel()
CodeTagger('myexpt/code_tag_table.xlsx')
CodeTagger('myexpt/code_tag_table.xlsx!for_evoked')
CodeTagger('myexpt/code_tag_table.xlsx!for_mixed_effects')
Tabbed text
a rows x columns tab-delimited text file readable by
pandas.Data.read_csv(..., sep="\t").
YAML
a yaml map readable by yaml.load(), mock-tabular format
described below.
* File formats
Excel and Tabbed-text
1. the data must be tabular in n rows and m columns (i,j >= 2)
2. column labels must be in the first row
3. the column labels must include 'regexp'
4. there must be at least one tag column, there may be more
+------------+--------------+------------------+
| regexp | col_label_1 | <col_label_m>* |
+============+==============+==================+
| pattern_1 | code_tag_11 | <code_tag_1m>* |
+------------+--------------+------------------+
| ... | ... | ... |
+------------+--------------+------------------+
| pattern_n | code_tag_n1 | <datum_nm>* |
+------------+--------------+------------------+
YAML files
The YAML can be any combination of inline (JSON-ic) and
YAML indentation that PyYAML yaml.load can handle.
1. must have one YAML document with two keys: ``columns`` and ``rows``.
2. the columns must start with `regexp`
3. the columns may continue ad lib.
4. each row must be a YAML sequence with the same number of items as there are columns
Example
.. code-block:: yaml
---
'columns':
[regexp, bin, probability, frequency, source]
'rows':
- ['(#1)', 1, hi, 880, oboe]
- ['(#2)', 2, hi, 440, oboe]
- ['(#3)', 3, lo, 880, oboe]
- ['(#4)', 4, lo, 440, oboe]
- ['(#5)', 5, hi, 880, tuba]
- ['(#6)', 6, hi, 440, tuba]
- ['(#7)', 7, lo, 880, tuba]
- ['(#8)', 8, lo, 440, tuba]
Row value data types
regexp : regular expresion `pattern* (#pattern) pattern*`
This is the code sequence search pattern. Log codes are separated
by a single whitespace for matching. The ``regexp`` has one or
more time-locking pattern capture groups of which one begins
with the time-marking anchor symbol, ``#``.
Flanking code patterns may be capture groups (...) and
non-capture groups (?:...)
All matched time-locking codes, the anchor code, and distance
between them are extracted from the mkpy.mkh5 datablocks and
merged with the code tags in the returned event_table for all
capture groups.
Additional columns: scalar (string, float, int, bool)
That's it. All the real work is done by 1) specifying regular
expressions that match useful (sequences of) integer event codes
and 2) specifying the optional column values that label the
matched codes in useful ways, e.g., by specifying factors and
levels of experimental design, or numeric values for regression
modeling or ...
Notes
* The values in any given column should all be the same data type:
string, integer, boolean, float. This is not enforced, violate
at your own risk.
* Missing data are allowed as values but discouraged b.c. 1) they
are handled differently by the pandas csv reader vs. yaml and
Excel readers. 2) the resulting NaNs and None coerce np.int and
np.str dtype columns into np.object dtype and incur a
performance penalty and 3) np.object dtypes are not readily
serialized to hdf5 ... h5py gags and pytables pickles them. 4)
It may lead to other unknown pathologies.
* For yaml files, if missing values are unavoidable, coding them
with the yaml value .NAN is recommended for all cases ... yes,
even string data. The yaml value null maps to None and behaves
differently in python/numpy/pandas. This is not enforced,
violate at your own risk
* Floating point precision. Reading code tag maps from yaml text files
and directly from Excel .xlsx files introduces the same rounding
errors for floating point numbers, e.g. 0.357 ->
0.35699999999999998. Reading text files introduces a *different*
rounding error, e.g.,0.35700000000000004.
* There is no provision for <t:n-m> time interval constraints on
code patterns. Maybe someday.
"""
[docs] class MissingAnchor(Exception):
def __init__(self, cause):
msg = (
"\nError: missing anchor mark\n"
"Cause: {0}\n"
"Fix: Mark exactly one target code pattern"
"with a # like this: (#mycode)\n"
).format(cause)
print(msg)
[docs] class MultipleAnchors(Exception):
def __init__(self, cause):
print("\nError: multiple anchor marks")
print("Cause: {0}".format(cause))
print(
"Fix: Mark exactly one target code pattern with a # like this: (#mycode)\n"
)
[docs] class BadCodePattern(Exception):
def __init__(self, in_patt, cause=None):
print(
"\nError: Regular expression syntax error in code pattern: {0}".format(
in_patt
)
)
if cause is not None:
print("Cause: {0}".format(cause))
def __init__(self, cmf):
"""initialize instance with a code tag map file."""
# TODO: handle different filetypes, don't let things fail silently
self.cmf = str(cmf) # for Path
loaders = {
"xlsx": self._load_xlsx_map,
"yaml": self._load_yaml_map,
"text": self._load_txt_map,
}
fails = []
code_map = None
for kind, loader in loaders.items():
try:
code_map = loader(self.cmf)
except Exception as fail:
fails.append((kind, fail))
continue
# break out on success
if code_map is not None:
self.code_map = code_map
return
# uh oh ...
for fail in fails:
print(f"failed {fail[0]}: {fail[1]}")
raise IOError(f"failed to load {cmf} as an xlsx, YAML, or text code map")
self._check_mapper(self.code_map)
def _check_mapper(self, mapper):
if not "regexp" in mapper.columns:
raise Exception(f"codemap {self.cmf} must include a regexp column")
if len(mapper.columns) < 2:
raise Exception(
f"codemap {self.cmf} must have regexp and "
"at least one additional code tag column."
)
for row, pattern in enumerate(mapper["regexp"]):
try:
re.compile(pattern)
except Exception as fail:
print(f"regexp row {row}")
raise fail
if code_map.columns[0] == "Index":
warnings.DeprecationWarning(
"As of mkpy 0.2.1 codemaps no longer require an Index as the first column."
)
def _load_xlsx_map(self, cmf):
"""wraps pandas.Dataframe.read_excel() to load a code tag table from .xlsx
Parameter
---------
cmf : str or Path
is path_to_file.xlsx[!named_sheet )path to an .xlsx file with optional.
Default selects first worksheet use .xlsx!sheet_name syntax to select a
named sheet.
Returns
-------
mapper : pandas.Dataframe
Examples
--------
_load_xlsx_map('myexpt/code_tag_table.xlsx')
_load_xlsx_map('myexpt/code_tag_table.xlsx!for_evoked')
_load_xlsx_map('myexpt/code_tag_table.xlsx!for_mixed_effects')
"""
# use !named_sheet if there is one, else default to 0 == first
cmf_reob = re.match(r"(?P<xl_f>.+\.xls[xm])[\!]*(?P<sheet_name>.*)$", cmf)
xl_f = cmf_reob["xl_f"]
sheet_name = cmf_reob["sheet_name"]
if len(sheet_name) == 0:
sheet_name = 0
mapper = pd.read_excel(
xl_f,
sheet_name=sheet_name,
header=0,
engine="openpyxl", # , index_col="Index"
)
return mapper
def _load_txt_map(self, cmf):
"""load tab-separated UTF-8 text file and return pandas DataFrame"""
with open(cmf, "r") as d:
mapper = pd.read_table(
cmf,
delimiter="\t",
header=0,
encoding="utf-8",
# index_col="Index",
)
return mapper
def _load_yaml_map(self, cmf):
"""load yaml mapper file and return pandas DataFrame"""
# slurp the code tags
with open(cmf, "r") as d:
mapper = yaml.load(d.read(), Loader=yaml.SafeLoader)
# modicum of format checking ...
if not isinstance(mapper, dict):
msg = (
"code tag map file is not a yaml map: "
+ "yaml.load({0}).__class__ == {1}".format(self.cmf, mapper.__class__)
)
raise ValueError(msg)
# nab column labels ... equivalent to header row in tabular code tag map
try:
col_labels = mapper["columns"]
ncols = len(col_labels)
except Exception:
print('code tag map must have "columns" entry')
raise
# nab rows
try:
rows = mapper["rows"]
nrows = len(rows)
except Exception:
print('code tag map must have "rows" entry')
raise
# modicum of value checking
for mapvals in rows:
# insist on non-empty column values
if not (isinstance(mapvals, list) and len(mapvals) == ncols):
msg = "{0}".format(mapvals)
msg += " map values must be a list of {0} items: {1}".format(
ncols, col_labels
)
raise ValueError(msg)
# check that the patterns will compile as a regexp
re.compile(mapvals[mapper["columns"].index("regexp")])
# return as a pandas data frame, indexed on Index
mapper = pd.DataFrame(mapper["rows"], columns=mapper["columns"])
# mapper.set_index("Index", inplace=True)
return mapper
def _pattern_to_str(self, pattern):
"""
normalize different input data types to a string rep for re matching
"""
# np.bytes_ has __abs__ so check it first ... yuck
if isinstance(pattern, np.bytes_):
# bytes
patt_str = pattern.decode("utf8")
elif hasattr(pattern, "__abs__"):
# numeric ... +/-
patt_str = pattern.__str__()
elif isinstance(pattern, str):
# strings
patt_str = pattern
else:
msg = (
"cannot convert {0} to string for pattern matching "
"must be integer, bytes, or string"
).format(pattern)
raise ValueError(msg)
# try to be helpful about invisible characters
if re.search(r"\\t", patt_str):
msg = (
"tab character in {0} never match, use a single "
"white space to delimit event codes"
).format(patt_str)
raise ValueError(msg)
if re.search(r"\s{2,}", patt_str):
msg = (
"consecutive whitespaces in {0} never match, use a single "
"white space to delimit event codes"
).format(patt_str)
raise ValueError(msg)
if re.match(r"^ ", patt_str):
warnings.warn("leading whitespace in {0}".format(patt_str))
if re.match(r" $", patt_str):
warnings.warn("trailing whitespace in {0}".format(patt_str))
# check regular expression syntax
try:
re.compile(pattern)
except Exception as msg:
raise self.BadCodePattern(in_patt=pattern, cause=msg)
return patt_str
def _parse_patt(self, pattern):
"""locate position of the anchor code in search pattern plus basic r.e. validation
Parameters
----------
pattern : regular expression string
regular expression pattern with exactly one anchor capture group
of the form (#...), optionally flanked by other code patterns
Returns
-------
(anchor, capture_groups, code_patt) : tuple
anchor : tuple
(anchor_group_index, anchor_match_object)
capture_groups : list
a list of the capture groups in pattern
code_patt : regular expression string
regular expression pattern with the (one and only)
anchor marker # stripped
"""
in_patt = self._pattern_to_str(pattern) # coerce input to a sensible r.e.
# define capture groups, supressing greedy matching w/ ? is essential
capt_group_patt = r"\((?!\?\:).+?\)" # any ( ) except non-capturing (?: )
# anchor_patt = r'\(#[-]{0,1}\d+\)' # matches integer code literals only
anchor_patt = r"\(#.+\)" # allow anchor pattern (# ...) allows patterns
# look up the capture groups including anchors
capture_groups = [g for g in re.finditer(capt_group_patt, in_patt)]
# check exactly one anchor group
anchors = [
(i, g)
for i, g in enumerate(capture_groups)
if re.match(anchor_patt, g.group(0))
]
if len(anchors) < 1:
raise (self.MissingAnchor(pattern))
elif len(anchors) > 1:
raise (self.MultipleAnchors(pattern))
else:
anchor = anchors[0]
# strip the # anchor mark
# code_patt = re.sub(r'#', r'', in_patt)
# like so to prevent stripping comments (?# ...)
code_patt = re.sub(r"\(#", r"(", in_patt)
# right-bound the captured group, e.g., (#10) -> (#10\b) else (#10) matches
# and extracts 1024. No expressive loss b.c. (#1024) and (#10\d\d) also match 1024
# The \b matches boundary at next white space or end of string
code_patt = re.sub("\\)", "\\\\b)", code_patt)
# these are used for pattern matching and lookup in find_codes
return (anchor, capture_groups, code_patt)
def _find_evcodes(self, pattern, ticks, evcodes):
r"""Pattern match sequences of integer codes and extract timing information
This finds arbitrary subsequences of integers in a 1-D array
of integers (``evcodes``) and returns bundles of match and
index information.
Whereas individual integers are readily matched by numerical
identity comparison, matching arbitrary subsequences requires
a more general search algorithm.
Regular expression pattern matching over character strings
affords just such generality, but based on character rather
than numeric identity comparisons, i.e., 2/4 == 2 is true but
'2/4' == '2' is false. So the 1-D integer array is mapped to
1-D character array (=string) representation drawn from the
alphabet ' -0123456789' with ' ' as a delimiter. Since the nth
delimiter corresponds to 0-base index of the nth integer in
the original array, the string positions of pattern matches on
the string representation can be converted back to the indices
of the corresponding values in the integer array.
For the intended use case where integer correspond to event
codes in a data stream it is convenient to single out one code
as the "anchor" in the sequence, ``#...`` and to use the
regular expression capture group mechanism ``( ... )`` to
identify those portions of the sequence to extract and return:
the anchor (#...) always and optionally other codes in the
matching sequence.
In conjunction with a specification of indexes into a data
stream (``ticks``), each match bundle provides all the
information necessary to look-up the location of the
subsequence of integers in the original data stream.
Approach: two steps
1. preprocess the search pattern to find the # anchor mark and
count the capture groups
2. sweep the pattern across a string-ified representation of
the integer event codes, extracting information bundles
for each captured group: the obligatory anchor code always,
and any other captured evcodes.
The extracted information bundles are dictionaries that
contain (primarily) the matched code in the `evcodes` vector,
the matched tick in the `ticks`, the index i at which these
values are found, i.e., idx such that evcodes[idx] = is the
matching and ticks[idx] = the value of the tick at that
match. Additional information specifies the relation between
the match and the anchor pattern.
Code Pattern Matching Definitions
--------------------
``digits``: char
The characters 0, 1, 2, 3, 4, 5, 6, 7, 8, 9. Each is
matched by r'[0-9]' or equivalent, e.g., r'\d'
``code`` : str
a sequence of one or more digits optionally preceded
by the - character. Matched by r'[-]{0,1}\d+'
``code pattern`` : regexp str
any regular expression that matches a code. Such as r'1'
to match code 1 or r'\d{2}' to match any two digit code or
r'\d{3}1' to match any four digit code ending in 1 or
r'\d{3}[02468]' to match any even four digit code.
``capturing code pattern`` : regexp str
any code pattern of the form r'(...)' that matches a code
or code sequence
``anchor pattern`` : regexp str
any capturing code pattern that captures a subset of the codes
captured by r'(#[-]{0,1}\d+)'.
``code sequence`` : str
a sequence of codes, each preceded by a single a single whitespace.
r'( [-]{0,1}\d+)+'
``search pattern`` : regexp str
any regular expression that contains exactly one anchor
pattern and matches a code or code sequence
.. Note::
A ``search pattern`` may contain capturing code
patterns in addition to the anchor pattern
"""
rvals = [] # return this
# bail out if there's nothing to do
if len(evcodes) == 0:
warnings.warn("list of event codes is empty")
return rvals
# parse the pattern parameter into useful chunks.
# details in _parse_patt().__doc__
anchor, capture_groups, code_patt = self._parse_patt(pattern)
try:
patt_regx = re.compile(code_patt)
except:
msg = "cannot compile {0} as regular expression".format(code_patt)
raise TypeError(msg)
# stringify the code list for matching against the code pattern
sep = " " # the single whitespace delimiter, critical for pattern matching
code_str = ""
for e in evcodes:
code_str += "{0}{1}".format(sep, e)
# sweep the pattern regular expression across the code string
matches = [m for m in patt_regx.finditer(code_str)]
# rank of the sep delimiter == event code index
# end boundary of nth sep delimiter is right-boundary of the nth event code
delims = [m.end() for m in re.finditer(sep, code_str)]
# assert len(delims)==len(evcodes) # very very bad if not
if len(delims) != len(evcodes):
msg = (
"something has gone horribly wrong in _find_event_codes(), "
"stop what you"
"re doing immediately, find urbach "
"and smack him up side the head."
)
raise ValueError(msg)
# 3. scan the string delimter values for pattern match span starts
for (didx, delim_at) in enumerate(delims):
# A search may find 0, 1, or 1+ pattern
# match(es). If a match is found there is at least one
# match group for the obligatory anchor and maybe more
# if the pattern contains additional capture groups.
# So for generality always iterate over m.groups()
for m in matches:
if delim_at == m.span()[0]:
m_group_info = []
# iterate thru the match groups in this m
mgi = 1 # individual match groups start at index 1
# copy indexes to process m's match groups
# w/out disturbing didx, delim_at
idx = didx
dlim = delim_at
anchor_idx, anchor_group_idx, anchor_tick = (None, None, None)
# this index points to the anchor capture group in m.groups()
anchor_group_idx = anchor[0] + 1
# for readability
anchor_delim = m.span(anchor_group_idx)[
0
] # string offset for anchor
anchor_idx = delims.index(
anchor_delim
) # index in code list of anchor
anchor_tick = ticks[anchor_idx] # index into the lists
# assert(int(m.group(anchor_group_idx)) == evcodes[anchor_idx])
# confirm stringified event code sequence w/ original array
if int(m.group(anchor_group_idx)) != evcodes[anchor_idx]:
msg = (
"uh oh, horrible bug #1 in the event code finder "
"... yell at urbach"
)
raise ValueError(msg)
while mgi <= m.lastindex and idx < len(evcodes):
if m.start(mgi) == dlim:
# scrape this match group info
info = None
# capture groups match one or more evcodes
# ... make a list, possibly singleton
enumevcodes = [
(i, c)
for i, c in enumerate(m.group(mgi).strip().split(" "))
]
# check the slicing and dicing ...
# the code (sequence) at this index must match the string pattern
# assert all([c == str(evcodes[idx+i]) for i,c in enumevcodes])
if any(
[c != str(evcodes[idx + i]) for i, c in enumevcodes]
):
msg = (
"uh oh, horrible bug #2 in the event code finder"
"... yell at urbach"
)
raise ValueError(msg)
# whew ...
for i, c in enumevcodes:
# each info is a list of (key, value) tuples, readily
# convertible something useful ... OrderedDict, pandas.Dataframe
info = [
("match_group", mgi),
("idx", idx),
("dlim", dlim),
("anchor_str", m.group(anchor_group_idx)),
("match_str", m.group(mgi)),
("anchor_code", evcodes[anchor_idx]),
("match_code", evcodes[idx + i]), # evcodes[idx],
("anchor_tick", anchor_tick),
("match_tick", ticks[idx + i]), # ticks[idx]
(
"anchor_tick_delta",
int(ticks[idx + i]) - int(anchor_tick),
),
("is_anchor", mgi == anchor_group_idx),
]
m_group_info.append(info)
mgi += 1
idx += 1 # keep looking to the right
if idx == len(evcodes):
continue # nothing else to look for, move on
dlim = delims[idx] # update delimiter
# vestigal bounds check ...
if idx > len(evcodes):
msg = (
"uh oh, event code list overrun horrible bug #3 in the "
"event code finder ... yell at urbach"
)
raise ValueError(msg)
# accumulate the data
rvals.append(m_group_info)
# done scanning, go home
if len(rvals) > 0:
# pp.pprint(rvals)
# pdb.set_trace()
return rvals
else:
return None