#!/usr/bin/env python3
"""pygarv is the backend for marking artifacts in mkh5 data with tests defined in a YAML file
Successful runs of tests and their results are stored in
PyGarv.tr_docs a list of tr_doc dicts, one dict per h5 datablock.
Parameters
----------
tr_doc['tests'] : list
each item is a dict
Examples
* tr_doc['tests']
.. code-block:: python
[ {'dblock_path_idx': 0,
'dblock_path': 'calstest/dblock_0',
'name': 'pygarv',
'tests': [ [{'test': 'ppa'},
{'tag': 'amplitude exursions'},
{'stream': 'MiCe'},
{'threshold': 0.0},
{'interval': 0.0} ],
[{'test': 'ppadif'},
{'tag': 'amplitude exursions'},
{'stream': 'MiCe'},
{'threshold': 0.1},
{'interval': 0.1},
{'stream2': 'MiPa'} ] ]},
{'dblock_path_idx': 1,
'dblock_path': 'calstest/dblock_1',
'name': 'pygarv',
'tests': None},
]
tr_doc['fails'] : list
len(tr_doc['fails'] == len(tr_doc['tests'])
where tr_doc['fails'][idx] is
a list of (start, stop) intervals in dblock_tick indexes where tr_doc['test'] failed.
tr_doc['pygarv']
* The tests are specified as a YAML file .yarf.
.. code-block:: yaml
---
dblock_path: some_path
dblock_path_idx: unint
name: pygarv
tests:
- - test_spec
- test_spec
...
- test_spec
* Each `test_spec` is a YAML map with a mandatory `name` and `tag`
parameter and optional other paramters as needed for specific tests
test: str
tag: str
where
`test` names a pygarv test function, e.g., `mxflat`, `ppadif`
`tag` is a user-defined descriptive tag, e.g., *blocking*, *heog*, *fancy test*
"""
__version__ = "0.0.0"
from collections import OrderedDict
import re
import pdb
import logging
import numpy as np
import pprint as pp
from matplotlib import pyplot as plt
import copy
import yaml
from yamllint import linter
from yamllint.config import YamlLintConfig
from mkpy import mkh5
import h5py
import sys
import warnings
# import dpath.util
from . import dpath
from mkpy import mkh5viewer
[docs]class PyYarf(object):
"""YAML test file I/O for PyGarv artifact test parameters
Parameters
----------
yarf_f : str
file path to well-formed YAML with PyYarf test specification structure
Attributes
----------
yarf_docs : list
each item is a yarf_doc dict that yamlizes in-out without modification
..code-block:: python
{'name': 'pygarv' (str),
'dblock_path_idx': n (uint)
'dblock_path': path_to_a_mkh5_dblock (str),
'tests': [ test_spec, ... test_spec] (list)}
Methods
------------
IO methods
read yarf_docs from yaml
write yarf_docs to yaml
read yarf_docs from mkh5 headers
PyYarf YAML format:
* exactly one yaml document per mkh5 dblock_path
* each doc is a map with 3 keys: `name`, `dblock_path`, `tests`
* the value of `name` must be `pygarv` (str)
* the value of `dblock_path` in the ith yaml doc must == mkh5.data_blocks[i] (str)
* the value of `tests` must be a list of test specifications (see PyGarvTest docs)
Examples
.. code-block:: yaml
# generated by PyYarf
---
dblock_path_idx: 0
dblock_path: calstest/dblock_0
name: pygarv
tests:
- - test: ppa_event
- tag: tag1
- stream: MiPf
- threshold: 20.0
- prestim: 500.0
- poststim: 1500.0
- - test: ppa_event
- tag: tag1
- stream: MiCe
- threshold: 50.0
- prestim: 100.0
- poststim: 1000.0
- - test: ppa_event
- tag: tag1
- stream: MiPa
- threshold: 10.0
- prestim: 10.0
- poststim: 200.0
---
dblock_path_idx: 1
dblock_path: calstest/dblock_1
name: pygarv
tests: []
"""
__version__ = "0.0.1"
_yarf_config = YamlLintConfig("extends: default")
_yarf_doc_template = dict(
dblock_path_idx=None, dblock_path="", name="pygarv", tests=[]
)
def __init__(self, yarf_f=None):
if yarf_f is not None:
self.yarf_f = yarf_f
# yarf file CRUD ... yaml I/O
[docs] def read_from_yaml(self, yarf_f):
"""return yarf_doc list populated with yarf info, if any, from mkh5 dblock headers"""
with open(yarf_f, "r") as yf:
yaml_stream = yf.read()
self.lint_yarf(yaml_stream) # raises Exception on bad YAML
# thank you PyYaml
yarf_iter = yaml.load_all(yaml_stream, Loader=yaml.SafeLoader)
yarf_docs = []
for yarf_doc in yarf_iter:
self.check_yarf_doc(yarf_doc)
yarf_docs.append(yarf_doc)
return yarf_docs # so far so good ...
[docs] def read_from_mkh5(self, mkh5_f):
"""scan mkh5 dblock headers and dblock['pygarv'] stream artifact test info
Returns
-------
yarf_docs : list of list of dict where
dict is a PyYarf format dict see PyYarf doc string for details
"""
yarf_docs = list()
has_yarf = list() # for error checking
h5 = mkh5.mkh5(mkh5_f)
for dbpath in h5.data_blocks:
hdr, _ = h5.get_dblock(dbpath)
yarf_doc = None
if "pygarv" in hdr.keys():
yarf_doc = hdr["pygarv"]
self.check_yarf_doc(yarf_doc)
has_yarf.append(True)
else:
# build an empty one
yarf_doc = dict([(k, v) for k, v in self._yarf_doc_template.items()])
# print('loading empty yarf_doc', dbpath)
yarf_doc["dblock_path"] = dbpath
has_yarf.append(False)
yarf_docs.append(yarf_doc)
# none is OK, all is OK, some but not all is probably pathological
if any(has_yarf) and not all(has_yarf):
missing = [h5.data_blocks[i] for i, d in enumerate(has_yarf) if d is False]
msg = (
"uh oh ... missing pygarv info in headers of {0} " "dblocks {1}"
).format(mkh5_f, missing)
raise ValueError(msg)
return yarf_docs
[docs] def to_yaml(self, yarf_docs):
"""return yarf_docs YAML-ized as string suitable for serialization"""
yaml_stream = (
"# generated by PyYarf v. {0}, " "edit at your own risk\n"
).format(self.__version__)
for yarf_doc in yarf_docs:
yaml_stream += "---\n" # doc delimiter
yaml_stream += yaml.dump(
yarf_doc, explicit_start=False, default_flow_style=False
)
self.lint_yarf(yaml_stream)
return yaml_stream
# YAML lint a string
[docs] def lint_yarf(self, yarf_stream):
"""run yamllint on yarf_stream, if errors die informatively"""
errors = [e for e in linter.run(yarf_stream, PyYarf._yarf_config)]
if errors != []:
msg = "\n\n*** {0} ***\n\n".format(self)
for e in errors:
msg += "{0}\n".format(e)
raise Exception(msg)
[docs] def check_yarf_doc(self, yarf_doc):
# does each YAML doc have all and only the right keys?
if set(yarf_doc.keys()) != set(PyYarf._yarf_doc_template.keys()):
msg = (
"yarf doc {0} must have "
"exactly these keys: {1}"
"".format(yarf_doc, PyYarf._yarf_doc_template.keys())
)
raise KeyError(msg)
# name?
if yarf_doc["name"] != PyYarf._yarf_doc_template["name"]:
msg = ("yarf doc 'name': {0} " "must be {1}").format(yarf_doc["name"])
raise ValueError(msg)
# if there are tests are they a list?
if yarf_doc["tests"] is not None:
if not isinstance(yarf_doc["tests"], list):
msg = ("yarf doc 'tests': {0} " "must be a list of tests").format(
yarf_doc["tests"]
)
raise ValueError(msg)
for i, t in enumerate(yarf_doc["tests"]):
if not isinstance(t, list):
msg = "{0} must be a list".format(t)
raise ValueError(msg)
for param_spec in t:
if not isinstance(param_spec, dict) or len(param_spec.keys()) != 1:
msg = (
'.yarf {0}: test parameter "{1}" is not '
"a {{key:value}} pair"
""
).format(yarf_doc["dblock_path"], param_spec)
raise ValueError(msg)
[docs]class PyGarvTest(OrderedDict):
"""Decorator class for the PyGarv tests.
This enforces an extensible standard form on PyGarv test specs and
execution.
The class derives from OrderedDict so it returns .keys() .values()
.items() in fixed original parameter order. This is useful for
populating test UI elements and reading writing YAML sequences
without scrambling the key:value pairs the way a dict() might.
Parameters
----------
param_specs : [(key,type), ...]
key : str
parameter label
type : Python type
required Python data type for values of the key
('test',str),
('tag', str),
('stream', str),
* Default test parameters (in sequence order)
test : str
corresponds to the self._test() function that runs it
tag : str
user specified descriptive tag for the test ... anything sensible
stream : str
name or regex pattern for primary dblock data stream(s) to run the test on
* Optional test specific `parameter:type` pairs are defined in the
decorator arguments
Raises
------
ValueError
If the type of a test parameter differs from that in ``param_specs``
* ``PyGarvTest`` overrides ``OrderedDict.__setitem__()`` with
additional type checking on the value of test['key'] = value
* The class variable ``param_specs`` specifies mandatory
``PyGarvTest`` parameters and types.
* Optional decorator arguments can extend the mandatory parameters
and types and will be automatically passed to the decorated test
function.
* all PyGarvTest instances have _default_params with key, type
* optional decorater args extend PyGarvTest instances with additional params
* public CRUD API is standardized
* To preserve test spec order for display and yamlized round
trips, test specs are stored internally as OrderedDicts and the
setter/getter API wants and returns lists of dict, i.e.,
..code-block:: python
[{'test':'ppa'}, ...{'interval':1500.0}]
Methods
-------
run(hdr, dblock, **kwargs)
Parameters
----------
hdr : dict
metadata consulted in running the tests, e.g., sampling rate
dblock : np.ndarray (named dtypes)
columns of data, typically accessed by dtype.name
Returns
-------
results : np.ndarray, dtype=bool, length = len(dblock)
sample-wise data rejection mask, 1=bad, 0=good
Usage
-----
"""
_max_path_len = 4096 # no particular reason, roughly max linux path length
# default parameter types, all PyGarvTests
__default_param_specs = [("test", str), ("tag", str), ("stream", str)]
def __init__(self, test, **kwargs):
"""test is passed in by decorator, kwargs are optional param=type specs"""
# handle the types
self._param_types = dict()
# default params
for p, t in PyGarvTest.__default_param_specs:
self._param_types[p] = t
# add any extra params, types from decorator
for p, t in kwargs.items():
self._param_types[p] = t
# set as an attribute to cross-check case self['test']
self.test = test
# self['test'] = test, other key values = None
self.reset()
# Override setting to include validation
def __setitem__(self, key, value):
"""type check all item settings"""
if value is not None:
# check type all keys
if not isinstance(value, self._param_types[key]):
msg = ("{0}: {1} value required type {2}" "").format(
key, value, self._param_types[key]
)
raise TypeError(msg)
# prevent string overrun
if isinstance(value, str) and len(value) > PyGarvTest._max_path_len:
msg = ("string length exceeds {0} for {1}: {2} ... {3}" "").format(
PyGarvTest._max_path_len, key, value[0:10], value[-10:]
)
raise RuntimeError(msg)
# check stream patterns can be compiled
if "stream" in key:
try:
re.compile(value)
except Exception as err:
msg = "bad regexp pattern: {0}".format(value)
msg += " ... {0}".format(" ".join([arg for arg in err.args]))
raise ValueError(msg)
# cross-check
if key == "test" and value != self.test:
msg = (
"failed on test={0} ... cannot change read-only value" ""
).format(value)
raise KeyError(msg)
OrderedDict.__setitem__(self, key, value)
# ------------------------------------------------------------
# decorator magic here wraps the decorated function, e.g.
#
# @PyGarvTest(ppa)
# ppa(header, dblock, **kwargs)
#
# with kwargs from self.keys(), self.values() and
# exposes the callable self.run() to do the work
#
# Similar to duck typing flexibility but with built in keys and
# value type-checking per-test by the decorator ... simple but
# flexible.
# ------------------------------------------------------------
def __call__(self, f, *args, **kwargs):
"""f(header, dblock, ...)"""
def run(*args, **kwargs):
# convert self odict k,v to dict for kwargs
test_params = dict([(k, v) for k, v in self.items()])
return f(*args, **test_params)
self.run = run
self.run.__doc__ = f.__doc__
# now PyGarv.ppa.run(hdr, dblock, ...) will execute with current params
return self
# --------------------------------------------------
# public-ish setter/getters
# --------------------------------------------------
[docs] def set_specs(self, test_params):
"""test_params is {key:value, ... } for test keys,values"""
for (k, v) in test_params.items():
self[k] = v
[docs] def reset(self):
# init the values
for k in self._param_types.keys():
if k == "test":
self[k] = self.test
else:
self[k] = None
[docs] def get_specs(self):
return dict([(k, v) for k, v in self.items()])
[docs] def param_type(self, param):
"""type of param"""
return self._param_types[param]
@property
def params(self):
"""names of the parameters this test as a list"""
return [p for p in self._param_types.keys()]
@property
def types(self):
"""data types of the values for the parameters as a list"""
return [t for t in self._param_types.values()]
@property
def param_types(self):
return self._param_types
@property
def specs(self):
return self.get_specs()
@property
def specs_as_yaml(self):
"""returns current specs as yaml string"""
yaml_specs = yaml.dump(
self.specs, explicit_start=True, default_flow_style=False
)
return yaml_specs
[docs]class PyGarv(object):
r"""container to hold an inventory of functions for computing sample-wise
artifact masks.
When invoked at the command line, pygarv needs an mkh5 file to work with
There are two cases:
- has not been previously garved with _update_mkh5()
- no pygarv test info in header
- pygarv data streams all zeros
- data has been previously garved with _update_mkh5()
- pygarv test info appears in header
- test results are unknown, possibly None
- pygarv data stream state is unknown
On init the mkh5 file is scanned for previous runs, if found the
pygarv data buffers (volatile) are synced with the info from the h5
file.
For each data block:
- self.tr_docs are set to match the header['pygarv'] dict
- self.yarf_fails are set according to dblock['pygarv'], self.tr_docs
- the value of pygarv = run_test(db_idx) (what-if run) is
checked against the dblock data, discrepanices throw a warning
PyGarv now has persistent and volatile rejection data in
alignment, suitable for viewing/editing in mkh5viewer
PyGarvTest
The PyGarvTest decorator handles all the default parameter
name and type bookkeeping for specific tests
To add a test to the catalog ...
1. implement a function that takes two args (hdr, dblock,
\*\*kwargs) and returns a boolean artifact mask of length dblock
data samples where 0 = good, 1 = bad.
The hdr (dict), and dblock (np.ndarray) are, e.g., as
returned by hdr, dblock = mkh5.get_dblock(path_to_datablock)
but can by any dict and dblock that expose variable needed to
compute the artifact mask.
2. decorate it with @PyGarvTest(test_name, [key=dtype, key=dtype])
where test_name is the test name and the list of key_i=dtype_i
optionally gives extra parameters named key_1, ... key_n
with data type dtype.
"""
def __init__(self, mkh5_f, yarf_f=None):
"""continuous artifact rejection manager
# FIX ME ... move to main PyGarv docs
- self.mkh5
read-only record of tests in dblock['pygarv'] and hdr['pygarv']. persistent.
- self.dblock_paths : list of str,
sequence of all the mkh5 datablock slashpaths as returned by mkh5.data_blocks
- self.tr_docs : list of dict
"tr_" abbreviates "test results". There is exactly one
tr_doc per mkh5 data and it is the master pygarv data
structure. It contains contain all the yarf_docs test
info *plus* test run results as a pygarv vector (1-D
array) of 64-bit uints and a list of fails.
By design, tests are dry run *before* loading into
tr_docs so these volatile test_specs, fails, and pygarv
vector are always 1-1 and consistent.
The tr_doc initialized from mkh5_f headers or loaded from
YAML .yarf. The tr_docs may be passed to PyYarfIO and the
test specs written as YAML.
tr_doc format on init
{'name': 'pygarv',
'dblock_path': ''
'dblock_path_idx: None
'tests': [ [] ... [] ],
'fails': [ [] ... [] ],
'pygarv': np.zeros(shape=(len(dblock), ),
dtype=dblock['pygarv'].dtype)
Each item in `tests` is a PyGarvTest format test specification
Each item in `fails` item is the corresponding test failures of
the form:
[(x0_0,x1_0) ... (x0_n,x1_n)]
where x0_i,x1_i are the index of beginning and end of a
contiguous fail. Note: x0==x1 for single sample failed is
allowed.
dbp_index dblock_paths tr_docs
----- -------------- ---------
0 dblock_paths[0] tr_docs[0]
1 dblock_paths[1] tr_docs[1]
. . .
. . .
. . .
n dblock_paths[n] tr_docs[n]
"""
# reset test parameters b.c. with class decorator approach, new
# instances in the same namespace inherit test specs set
# during previous calls
for attrib in ["_catalog", "mkh5", "mkh5_f", "dblock_paths", "yarf", "tr_docs"]:
setattr(self, attrib, None)
# clear tests in case of carry over from previous test runs
self._reset_tests()
# set the inventory of available tests
self._catalog = self._init_catalog()
# set the file names
self.mkh5_f = mkh5_f
# ready the mkh5 data
self.mkh5 = mkh5.mkh5(mkh5_f)
self.dblock_paths = self.mkh5.data_blocks
# ready the yarf I/O manager
self.yarf = PyYarf() #
# init the tests and results data structure
self.tr_docs = list()
for dbp_idx, dbp in enumerate(self.mkh5.data_blocks):
hdr, dblock = self.mkh5.get_dblock(dbp)
tr_doc = {
"dblock_path": dbp,
"dblock_path_idx": dbp_idx,
"name": "pygarv",
"tests": [],
"fails": [],
"pygarv": np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype),
}
self.tr_docs.append(tr_doc)
def _update_tr_docs_from_yarf_docs(self, yarf_docs):
"""iterates through an entire yarf_docs (= test specs) and updates tr_docs"""
self._check_tr_docs(yarf_docs)
# dry run individual tests all tests individually
for dbp_idx, dbp in enumerate(self.mkh5.data_blocks):
yarf_doc = yarf_docs[dbp_idx]
for test_idx, test in enumerate(yarf_doc["tests"]):
# default test_idx=None is to append the tests and results
exception = self._update_tr_docs(dbp_idx, test_idx, test)
if exception is not None:
exception.args = (
"{0} in data_block: {1} ".format(*exception.args, dbp),
)
raise exception
self._check_tr_docs(yarf_docs)
def _update_tr_docs_from_mkh5(self):
"""scrape all yarf docs, from mkh5 headers, run and collect all in tr_docs"""
print("updating tr_docs from mkh5")
if self.mkh5_f is None:
raise ValueError("mkh5_f not set")
# gotta have some data ...
if self.dblock_paths == []:
msg = "no data block paths in " + self.mkh5_f
raise ValueError(msg)
# populate yarf docs from existing hdr['pygarv'] footprints
yarf_docs = self.yarf.read_from_mkh5(self.mkh5_f)
# collect tests in tr_docs
self._update_tr_docs_from_yarf_docs(yarf_docs)
# FIX ME??? ... compare tr_doc['pygarv '] with dblock['pygarv'] and
# and warn of mismatch
def _update_tr_docs_from_yaml_f(self, yarf_f):
"""collect all YAML tests and results into tr_docs"""
yarf_docs = self.yarf.read_from_yaml(yarf_f)
self._update_tr_docs_from_yarf_docs(yarf_docs)
# # pygarv.mkh5 may or may not have been pygarved. Either way,
# # if there is a yarf_f, it trumps previous pygarv info
# if self.yarf_f is None:
# raise ValueError('yarf_f not set')
# tr_docs getters ...
def _get_yarf_doc_from_tr_doc(self, tr_doc):
"""pulls out just the yarf_doc test info from current tr_docs[dbp_idx], no results.
Unfortuante consequence of pooling tests and results in tr_docs, tho a lesser evil
than segregating them.
"""
yarf_doc_keys = PyYarf._yarf_doc_template.keys()
yarf_doc = dict()
for k in yarf_doc_keys:
yarf_doc[k] = tr_doc[k]
return yarf_doc
def _get_yarf_docs_from_tr_docs(self):
yarf_docs = []
for tr_doc in self.tr_docs:
yarf_docs.append(self._get_yarf_doc_from_tr_doc(tr_doc))
return yarf_docs
def _init_catalog(self):
"""inventory of implemented tests as given by @PyGarvTest"""
catalog = dict()
for a in dir(self):
this_attr = getattr(self, a)
if isinstance(this_attr, PyGarvTest):
this_attr.reset() # clear params
catalog.update(dict([(this_attr.test, this_attr)]))
return catalog
def _load_tr_docs_from_yaml(self, yarf_f):
"""populate self.tr_docs dict from YAML yarf file"""
return self.yarf.read_from_yaml(yarf_f)
def _check_tr_docs(self, tr_docs):
"""sanity check tr_docs list lines up with h5 data blocks and each
yarf_doc is well-formed.
Checks form only, does not re-run tests or check semantics of the results
"""
if tr_docs is None:
raise ValueError("tr_docs is None")
assert len(self.dblock_paths) == len(tr_docs)
for dbp_idx, db_path in enumerate(self.dblock_paths):
tr_doc = self.tr_docs[dbp_idx]
assert len(tr_doc["tests"]) == len(tr_doc["fails"])
# do pygarv and fails agree at least in form?
if (
all(tr_doc["pygarv"] == 0)
and any(len(fail) > 0 for fail in tr_doc["fails"])
) or (
any(tr_doc["pygarv"] != 0)
and all(len(fail) == 0 for fail in tr_doc["fails"])
):
msg = "tr_docs[{0}] fails and pygarv do not agree".format(dbp_idx)
raise ValueError(msg)
# check that non-zero bits in the tr_doc['pygarv'] still
# agree w/ non-emtpy tr_doc['fails']
fails_from_pygarv_bits = self._decode_pygarv_stream(
tr_doc["pygarv"], tr_doc
)
if len(tr_doc["fails"]) > 0 or max(tr_doc["pygarv"]) > 0:
if tr_doc["fails"] != fails_from_pygarv_bits:
log_msg = (
"tr_doc["
"pygarv"
"] bits do not match non-empty tr_doc["
"fails"
"] ... yell at urbach immediately\n"
)
log_msg += "data block path: {0}\n".format(db_path)
log_msg += (
"fails according to tr_doc["
"pygarv"
"] bits: {0}\n".format(fails_from_pygarv_bits)
)
log_msg += (
"fails according to tr_doc["
"fails"
"]: {0}".format(tr_doc["fails"])
)
logging.error(pp.pformat(log_msg))
err_msg = (
"probable pygarv bug ... see the latest .mkpy/logs for details"
)
raise ValueError(err_msg)
def _reset_tests(self):
"""clears parameter specs, all tests"""
for a in dir(self):
this_attr = getattr(self, a)
if isinstance(this_attr, PyGarvTest):
this_attr.reset() # clear any residual parameters
[docs] def get_catalog(self):
return self._catalog
def _run_test(self, dbp_idx, test_spec):
"""run the test on the dblock[dpb_idx] data
This is a dry run ... returns usable results and fails, does not change data
Parameters
----------
dbp_idx : uint
index of the datablock to run the tests on
test_spec : list
pygarv test specs format [param:value, ... param:value]
Returns
-------
(results, fails) : 2-ple
results : np.ndarray, len(dblock), dtype=bool
True at samples where test failed
fails : list of uint tuples
[{'x0': i, 'x1': j, 'test_idx', k}, ... ] where
the i, j are start, end of consecutive True,
i.e., a stretch of bad data and k is the index of the test in
the list of tests, this dblock
Normative use is where
`result` is returned by pg.sometest.run(hdr,dblock)
`test` is an item from the test list at tr_docs[dbp_idx]['tests']
"""
# pg.dblock_paths == mkh5.data_blocks, should fix mkh5
dbp = self.dblock_paths[dbp_idx]
# lookup data block hdr and data. Note: strict hdf5 root paths
# have a slash prefix, the h5py root datagroup path does not
# so we remain agnostic
if self.dblock_paths is None:
raise ValueError("PyGarv.dblock_paths is None")
hdr, dblock = self.mkh5.get_dblock(dbp)
hdr_dbp = re.match(r"/*(.+)", hdr["h5_dataset"]).groups()[0]
# sanity checks ... header, h5.data_blocks, yarf ...does everything agree?
if not dbp == hdr_dbp: # == test_params['dblock_path']:
msg = "Fatal mismatch in tr_doc, h5 header, test dblock path"
raise ValueError(msg)
# fill test specs
test_params = dict()
for kv in test_spec:
test_params.update(kv)
# lookup the test function in the pygarv catalog by name
# and set its params
this_test = self._catalog[test_params["test"]]
this_test.reset()
this_test.set_specs(test_params)
# run it, then clear the settings
this_result = this_test.run(hdr, dblock)
this_test.reset()
# fails [ (start,stop) ... (start,stop)] for contiguous fails
fails = self._compress_result(this_result)
# if we make it here ... update
return (this_result, fails)
def _compress_result(self, result):
"""compress full-length boolean pygarv boolean result test vector into
a list of fails [(start,stop)...(start,stop)] tuples encoding
contiguous runs of True.
"""
fail_idxs = np.where(result == True)[0]
fails_x0_x1 = []
# case 1: no points failed this test
if len(fail_idxs) == 0:
# no fails so no (x0,x1) tuples
pass
else:
# case 2: one or more points failed this test
cnt = 0
n_xs = len(fail_idxs)
x0 = fail_idxs[0] # init left edge, exists b.c len > 0
while cnt < n_xs:
x1 = fail_idxs[cnt] # update right edge which may == left edge
if cnt + 1 == n_xs:
# last point is always a right bound, no new region
fails_x0_x1.append((x0, x1))
else:
# look ahead continuous fail
if fail_idxs[cnt + 1] != fail_idxs[cnt] + 1:
# if not, append and start a new retion
fails_x0_x1.append((x0, x1))
x0 = fail_idxs[cnt + 1]
cnt = cnt + 1
return fails_x0_x1 # possibly []
[docs] def run_dblock(self, dbp_idx, tr_doc):
"""Run tests in the tr_doc for datablock at dbp_idx, returns 64-bit pygarv sample mask.
Parameters
----------
dpb_idx : uint
index of the ith dblock in self.dblock_paths
tr_doc : dict
PyYarf format dict with tr_doc['tests']
Returns
-------
results
dict of results like so:
.. code-block:: python
{name: 'results',
dblock_path: str (== the yarf_dbp),
pygarv : np.ndarray(shape=(len(dblock),),
dtype=dblock['pygarv'].dtype),
fails : list of uint 2-ples (x0, x1)}
* The fails list amounts to an RLL compression of the boolean vector `pygarv > 0`
Raises
------
ValueError if tr_doc['dblock_path'] != self.dblock_paths[dbp_idx]
"""
# originally from h5
dbp = self.dblock_paths[dbp_idx]
# from PyYarf
yarf_dbp = tr_doc["dblock_path"]
yarf_test_list = tr_doc["tests"]
# lookup data block hdr and data. Note: hdf5 root paths have a
# slash prefix, the h5py root datagroup path does not so we
# remain agnostic
if self.dblock_paths is None:
raise ValueError("PyGarv.dblock_paths is None")
hdr, dblock = self.mkh5.get_dblock(dbp)
hdr_dbp = re.match(r"/*(.+)", hdr["h5_dataset"]).groups()[0]
# three way sanity check ... header, h5.data_blocks, yarf ...does everything agree?
if not dbp == yarf_dbp == hdr_dbp:
msg = "Fatal mismatch in mkh5 - pygarv - .yarf dblock path"
raise ValueError(msg)
# init the return ... no tests -> no fails -> all zeros
results = dict(
name="results",
dblock_path=yarf_dbp,
pygarv=np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype),
fails=None,
)
# if nothing to do, return
if yarf_test_list is None or len(yarf_test_list) == 0:
return results
# otherwise run the tests in yarf test list
# init to whatever dtype is in the dblock['pygarv']
results["pygarv"] = np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype)
# compute the bit fiddled pygarv artifact mask
for i, t in enumerate(yarf_test_list):
# gather the list of key:value params for this test in
# into a dict for test.set_specs(kwargs)
test_params = dict()
# test_params['dblock_path'] = dbp # DEPRECATED
for kv in t:
test_params.update(kv)
# lookup the test function in the pygarv catalog by name
# and set its params
this_test = None
this_test = self._catalog[test_params["test"]]
this_test.set_specs(test_params)
# run it then clear the params
result = None
result = this_test.run(hdr, dblock)
this_test.reset()
# update the results['pygarv'] stream from the boolean result
# by OR masking the ith bit where the test fails
results["pygarv"] = self._encode_pygarv_stream(i, result, results["pygarv"])
# map the fiddled bits back to indices of the test in
# tr_doc that failed ... this is for visualization, human
# consumption at run time, not stored in mkh5 hdr or dblock
results["fails"] = self._decode_pygarv_stream(results["pygarv"], tr_doc)
if tr_doc["fails"] != results["fails"]:
msg = ("updating test results: {0}").format(tr_doc)
warnings.warn(msg)
return results
[docs] def get_result(self, pg_test_result):
"""convenience wrapper to query a test result, decode the mask, and
return with its test in a handy package.
Parameters
----------
pg_test_result : a (tr_doc, pygarv_mask) tuple
as returned by run_* functions
"""
raise NotImplementedError
[docs] def run_tests(self):
"""fetch tests and pygarv mask for all dblocks, does not modify mkh5
Returns
-------
pg_test_results : list of 2-ples (tr_doc, pygarv_mask), one
for each datablock in self.mkh5
"""
# one mask per dblock, suitable for assigning to
# hdr['pygarv']
results = []
for dbp_idx, dbp in enumerate(self.dblock_paths):
# print('pygarving', dbp)
these_results = None
tr_doc = self.tr_docs[dbp_idx]
results.append(self.run_dblock(dbp_idx, tr_doc))
return results
def _update_mkh5(self):
"""wrapper to pull tests and results out of tr_docs and push them into mkh5"""
if self.tr_docs is None:
ValueError("no tr_docs")
hio = self.mkh5.HeaderIO() # used below to update header
for dbp_idx, dbp in enumerate(self.mkh5.data_blocks):
# sanity check
tr_doc = self.tr_docs[dbp_idx]
if tr_doc["dblock_path"] != dbp:
msg = "uh oh ... mkh5 v. tr_doc dblock_path mismatch in _update_mkh5"
raise ValueError(msg)
if tr_doc["dblock_path_idx"] != dbp_idx:
msg = (
"uh oh ... mkh5 v. tr_doc dblock_path_idx mismatch in _update_mkh5"
)
raise ValueError(msg)
try:
with h5py.File(self.mkh5_f, "r+") as h5:
dblock = h5[dbp] # open, read write h5py Dataset
# overwrite the pygarv data stream and header['pygarv']
dblock["pygarv"] = tr_doc["pygarv"] # bit-fiddled stream
hio.get(dblock) # fetch header, this dblock
# brittle ... accessing header dict directly
hio._header["pygarv"] = self._get_yarf_doc_from_tr_doc(tr_doc)
hio.set(dblock) # modded header jsonified into dblock.attrs
except Exception as err:
msg = (
"\nVERY VERY BAD ... pygarving {0} {1} failed part "
"way through, possible mkh5 data corruption."
"".format(self.mkh5_f, self.yarf_f)
)
if len(err.args) == 0:
err.args = (msg,)
else:
err.args = ("{0} {1}".format(err.args[0], msg),)
raise err
# ------------------------------------------------------------
# Test Developer API:
#
# hdr : dict
# mkh5 dblock header ... sample rate, column specs, etc..
#
# dblock : np.ndarray
# mkh5 data block
#
# default kwarg keys exposed by PyGarvTest for use in the test body:
#
# 'dblock_path' (str) current dblock_path
# 'test' (str) name of this test
# 'tag' (str) user description, may be anything
# 'stream' (str) name of dblock dtype for column selection
# 'threshold' (float) test critical value
# 'interval' (float) length of interval (ms) to sweep the test
#
# Note: to dump or inspect kwarg keys:values at run time use, e.g.,
#
# print(kwargs)
# pdb.set_trace()
#
#
# ------------------------------------------------------------
# garv-like event-based tests ... *only* the event code sample gets tagged
# param_types = dict(stream=str, threshold=float, prestim=float, poststim=float)
# @PyGarvTest('ppa', **param_types)
# def ppa(hdr, dblock, *args, **kwargs):
# '''tag events with single stream peak-to-peak amplitude excursions
# Parameters
# ----------
# stream : regex
# stream label pattern to match, e.g. 'MiPa' or '\w+'
# threshold : float
# amplitude excursion threshold
# prestim : float (>= 0)
# length in ms to scan before the anchor event
# poststim : float (>= 0)
# length in ms to scan after the anchor event
# '''
# stream = kwargs['stream']
# threshold = kwargs['threshold']
# prestim_ms = kwargs['prestim']
# poststim_ms = kwargs['poststim']
# print('running ppa_event', threshold, prestim_ms, poststim_ms)
# prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr['samplerate'])
# poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr['samplerate'])
# n_samps = len(dblock)
# result = np.full(shape=(n_samps,), fill_value=False)
# ev_idxs = np.where(dblock['log_evcodes'] > 0)[0]
# for ev_idx in ev_idxs:
# interval = slice( max(0,ev_idx-prestim_samps), min(n_samps, ev_idx + poststim_samps))
# ev_view = dblock[stream][interval].view()
# if np.ptp(ev_view) > threshold:
# result[ev_idx] = True # mark this event sample bad
# return result
[docs] @PyGarvTest("ppa", stream=str, threshold=float, prestim=float, poststim=float)
def ppa(hdr, dblock, *args, **kwargs):
r"""tag event if any stream regexp match has peak-to-peak amplitude excursion
Parameters
----------
stream : regex
stream label pattern to match, e.g. '.+' or 'MiPa' or '\w+'
threshold : float
amplitude excursion threshold
prestim : float (>= 0)
length in ms to scan before the anchor event
poststim : float (>= 0)
length in ms to scan after the anchor event
"""
stream_patt = kwargs["stream"]
test_streams = [
stream
for stream in hdr["streams"].keys()
if re.match(stream_patt, stream)
and "dig_chan" in hdr["streams"][stream]["source"]
]
if len(test_streams) == 0:
msg = "no streams match: {0}".format(stream_patt)
raise RuntimeError(msg)
threshold = kwargs["threshold"]
prestim_ms = kwargs["prestim"]
poststim_ms = kwargs["poststim"]
print(
"running ppa with regex stream",
threshold,
prestim_ms,
poststim_ms,
test_streams,
)
prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"])
poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"])
n_samps = len(dblock)
result = np.full(shape=(n_samps,), fill_value=False) # init no artifacts
ev_idxs = np.where(dblock["log_evcodes"] > 0)[0]
for ev_idx in ev_idxs:
for stream in test_streams:
interval = slice(
max(0, ev_idx - prestim_samps),
min(n_samps, ev_idx + poststim_samps),
)
ev_view = dblock[stream][interval].view()
# thank you numpy ...
if np.ptp(ev_view) > threshold:
result[
ev_idx
] = True # mark this event sample bad on first bad channel
break # no need to look further
if result[ev_idx]:
continue # move on to the next event
return result
[docs] @PyGarvTest(
"maxflat", stream=str, threshold=float, nsamp=int, prestim=float, poststim=float
)
def maxflat(hdr, dblock, *args, **kwargs):
r"""tag events on regex stream for flat runs
Parameters
----------
stream : regex
stream label pattern to match, e.g. 'MiPa' or '\w{3}$'
threshold : float
minimum range allowable
nsamp : int
length in samples of rolling window to scan for flatness
prestim : float (>= 0, units ms)
time (ms) relative to the event to start scanning for flatness
poststim : float (>= 0, units ms)
time (ms) relative to the event to stop scanning for flatness
Returns
-------
result : np.ndarray(shape=(len(dblock), ), dtype='bool')
True at dblock indexs where test fails
"""
stream_patt = kwargs["stream"]
test_streams = [
stream
for stream in hdr["streams"].keys()
if re.match(stream_patt, stream)
and "dig_chan" in hdr["streams"][stream]["source"]
]
if len(test_streams) == 0:
msg = "no streams match: {0}".format(stream_patt)
raise RuntimeError(msg)
threshold = kwargs["threshold"]
prestim_ms = kwargs["prestim"]
poststim_ms = kwargs["poststim"]
win_len = kwargs["nsamp"]
if win_len <= 0:
raise ValueError("maxflat nsamp must be > 0")
print("running mxflat_event", threshold, win_len, prestim_ms, poststim_ms)
prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"])
poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"])
n_samps = len(dblock)
result = np.full(shape=(n_samps,), fill_value=False)
ev_idxs = np.where(dblock["log_evcodes"] > 0)[0]
for ev_idx in ev_idxs:
interval = slice(
max(0, ev_idx - prestim_samps), min(n_samps, ev_idx + poststim_samps)
)
for stream in test_streams:
ev_view = dblock[stream][interval].view()
for i in range(len(ev_view) - win_len):
# scan nsamp sub intervals for a flat line
if np.ptp(ev_view[i : i + win_len]) < threshold:
result[ev_idx] = True # mark this event sample bad
break # no need to look further
if result[ev_idx]:
continue # done, these streams move on to the next event
return result
param_types = dict(
stream=str, stream2=str, threshold=float, prestim=float, poststim=float
)
[docs] @PyGarvTest("ppadif", **param_types)
def ppadif(hdr, dblock, *args, **kwargs):
"""tag events with two-stream amplitude difference excursions"""
stream = kwargs["stream"]
stream2 = kwargs["stream2"]
threshold = kwargs["threshold"]
prestim_ms = kwargs["prestim"]
poststim_ms = kwargs["poststim"]
print("running ppadif", threshold, prestim_ms, poststim_ms)
prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"])
poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"])
n_samps = len(dblock)
result = np.full(shape=(n_samps,), fill_value=False)
ev_idxs = np.where(dblock["log_evcodes"] > 0)[0]
for ev_idx in ev_idxs:
interval = slice(
max(0, ev_idx - prestim_samps), min(n_samps, ev_idx + poststim_samps)
)
if (
np.ptp(
dblock[interval][stream].view() - dblock[interval][stream2].view()
)
> threshold
):
result[ev_idx] = True
return result
# continuous data tagging tests ----------------------------------------
param_types = dict(threshold=float, interval=float)
[docs] @PyGarvTest("cstdev", **param_types)
def cstdev(hdr, dblock, *args, **kwargs):
"""tag intervals that span cross-channel amplitude standard deviation excursions"""
threshold = kwargs["threshold"]
interval_ms = kwargs["interval"]
print("running stdev", threshold, interval_ms)
interval_samps = mkh5.mkh5._ms2samp(interval_ms, hdr["samplerate"])
eeg_streams = [
stream_name
for stream_name, stream in hdr["streams"].items()
if "dig_chan" in stream["source"]
]
result = np.zeros(shape=(len(dblock),), dtype=bool)
# for i in range(len(dblock)):
# result[i] = np.std(dblock[eeg_streams][i].astype(np.ndarray)) > threshold
nsamp = len(dblock)
idx = 0
cntr = 0
while idx + interval_samps + 1 < nsamp:
for i in range(idx, idx + interval_samps + 1):
std = np.std(dblock[eeg_streams][i].astype(np.ndarray))
if std > threshold:
result[idx : idx + interval_samps] = 1
break
idx += interval_samps
return result
# this test takes two extra params w/ numpy dtypes str, float
param_types = dict(stream=str, stream2=str, threshold=float, interval=float)
[docs] @PyGarvTest("cppadif", **param_types)
def cppadif(hdr, dblock, *args, **kwargs):
"""peak-to-peak amplitude difference stream2 - stream"""
stream = kwargs["stream"]
stream2 = kwargs["stream2"]
threshold = kwargs["threshold"]
interval_ms = kwargs["interval"]
interval_samps = mkh5.mkh5._ms2samp(interval_ms, hdr["samplerate"])
print("running ppadif:", stream, stream2, threshold, interval_ms)
nsamp = len(dblock)
result = np.zeros(shape=(nsamp,), dtype=bool)
result = np.abs((dblock[stream2] - dblock[stream])) > threshold
idx = 0
# fast enough
while idx + interval_samps + 1 < nsamp:
if any(result[idx : idx + interval_samps + 1]):
result[idx : idx + interval_samps] = 1
idx += interval_samps
return result
param_types = dict(stream=str, threshold=float, interval=float)
[docs] @PyGarvTest("cppa", **param_types)
def cppa(hdr, dblock, *args, **kwargs):
"""peak-to-peak amplitude (stub)"""
print("running ppa:")
pp.pprint(kwargs)
# print(dblock[kwargs['stream']])
return np.zeros(shape=(len(dblock),))
# ------------------------------------------------------------
# PyGarv.tr_docs CRUD
# ------------------------------------------------------------
def _delete_tr_docs(self, dbp_idx, test_idx):
"""removes the test and its results from at tr_docs[dbp_idx]
['tests'][test_idx] and updates
the tr_docs[dbp_idx]['fails'] tr_docs[dbp_idx]['pygarv']
"""
tr_doc = self.tr_docs[dbp_idx] # lookup the tests/results doc
n_tests = len(self.tr_docs[dbp_idx]["tests"])
self.tr_docs[dbp_idx]["tests"].pop(test_idx)
self.tr_docs[dbp_idx]["fails"].pop(test_idx)
# move the pygarv bits above the popped test one bit left
mask = self.tr_docs[dbp_idx]["pygarv"].copy()
for i in range(test_idx, n_tests - 1):
mask -= (mask >> i & 1) << i # zero out the ith bit
mask += ((mask >> (i + 1)) & 1) << i # copy i+1 bit to ith bit
mask -= (mask >> (n_tests - 1) & 1) << n_tests - 1 # zero out the last bit
self.tr_docs[dbp_idx]["pygarv"] = mask.copy()
def _update_tr_docs(self, dbp_idx, test_idx, test):
"""Primary Create/Update pygarv test CRUD operation.
The test is dry run on data_blocks[dbp_idx]. If an exception
is raised, it is returned. If no exception is raised, the
corresponding tr_docs[dbp_idx] test specs and results are
modified and None is returned.
Parameters
----------
dbp_idx: uint
index of the ith datablock path in self.mkh5.data_blocks
test_idx : uint
index of the test in the self.tr_docs[dbp_idx]['tests'].
If None, operation appends test to tr_docs[dbp_idx]['tests']
test: list
PyGarvTest format list of singleton param:value dicts,
e.g., [ {key:val}, ..., {key:val}]
Returns
-------
The first Exception raised when running the test or None on success.
"""
tr_doc = self.tr_docs[dbp_idx] # lookup the tests/results doc
n_tests = len(self.tr_docs[dbp_idx]["tests"])
# check the test index this dblock
if test_idx > n_tests:
raise IndexError("test_idx > number of tests")
# dry run the test and collect results
try:
result, fails = self._run_test(dbp_idx, test)
except Exception as err:
return err
# if we make it here, mod the tr_docs with the new info
if test_idx < n_tests:
# update is overwrite in place
self.tr_docs[dbp_idx]["tests"][test_idx] = test
self.tr_docs[dbp_idx]["fails"][test_idx] = fails
elif test_idx == n_tests:
# update is append
self.tr_docs[dbp_idx]["tests"].append(test)
self.tr_docs[dbp_idx]["fails"].append(fails)
else:
# handled on the way in
pass
# update the pygarv vector
self.tr_docs[dbp_idx]["pygarv"] = self._encode_pygarv_stream(
test_idx, result, self.tr_docs[dbp_idx]["pygarv"]
)
return None
# ------------------------------------------------------------
# misc private utility methods
# ------------------------------------------------------------
def _encode_pygarv_stream(self, test_idx, results, mask):
"""set only i-th bit of pygarv mask[j] = 1 where results[j] == True for test_idx == i
Parameters
----------
test_idx : uint < 64
index in the list of tests, this dblock. Bit at this index encodes results
results : np.ndarray, dtype=bool
sample-wise results of a pygarv test ... 1 is bad/fail, 0 is good/pass
mask : np.array, dtype='u4'
array of 64-bit uints where the test_idx bit encodes test_idx results.
Returns
-------
mask : np.array, dtype='u4'
This is the same mask bit-fiddled if at all, only at the test_idx-th bit.
Usage
-----
# pseudo-code for a given dblock and list of tests
mask = np.zeros(shape=(len(dblock),), dtype='u4')
for i,t in enumerate(tests):
result = run_the_test(..., t, ...) # returns boolean len(dblock)
mask = _encode_pygarv_stream(i, results, mask) # update the mask, bit i
# now pygarv mask is current, this deblock
The pygarv mask has the following properties:
1. Any non-zero value in the mask indicates some test failed
at that sample (quick check)
2. From the numerical value of the mask at any given sample,
unfiddling the non-zero values in the i-th bit-position
recovers the i-th pygarv['tests'] in the list of tests in
this dblock's header['pygarv'] info, i.e., points back to
all there is to know about why this sample is marked bad
... stream, tag, threshold, other test params, etc..
3. The index-to-bit-to-test mapping only holds per dblock
since test specs can vary across dblocks.
"""
assert test_idx < 64
assert results.shape == mask.shape
assert results.ndim == mask.ndim == 1
reset_vec = np.zeros(shape=(len(results),), dtype=results.dtype)
if any(results):
# zero the bits at test_idx, then reset w/ new results
mask -= ((mask >> test_idx) & 1) << test_idx
mask += ((2**test_idx) * results).astype(mask.dtype)
return mask
def _decode_pygarv_sample(self, s, tr_doc_tests):
"""decode the bits set in s and return the failed test specs
Parameters
----------
s : uint64
as found in a pygarv_mask
tr_doc_tests: list of PyGarv format tests specs
Returns
-------
failed tests : list of pygarv test specs
those that tests which failed during run_tests()
"""
# idxs = []
failed_tests = []
ii = 0
# scan bits only as far as necessary
while 2**ii <= s:
# check last bit ... same as div mod 2
if (s >> np.uint(ii)) & np.uint(1):
# idxs.append(ii)
failed_tests.append(tr_doc_tests[ii])
ii += 1
return failed_tests
def _decode_pygarv_stream(self, pygarv_stream, tr_doc, compress=True):
"""recover pygarv test indexes from bits in sample pygarv_stream
This function operates per tr_doc, datablock pair
The 64-bit uint at each sample of the pygarv data stream encodes
which test or tests in the tr_doc['tests'] failed at that sample.
This function decodes such uints, mapping the non-zero bits
i,j,k, ... back to integers i, j, k ... that index the ith,
jth, kth, test(s) in tr_doc that failed. Whew.
Parameters
----------
pygarv_stream : np.ndarray, uint64, shape=(len(dblock), )
set bit i indicates failed test i in tr_doc['tests']
tr_doc : dict
in yarf format ...
{name: 'pygarv',
'dblock_path' : slashpath_to_mkh5_dblock,
'tests' : [ test_spec, ... test-spec ]
'fails' : [ [ {}, {}, ] ... [ {}, {}, ] ] ]
}
where each test_spec are the (ordered) param specs of a
PyGarvTest.
Returns
-------
fails: list of list of uints
Each list in `fails` is a lookup table of the samples in the
datablock where the corresponding test in tr_doc['tests']
failed. I.e.,
* len(fails) == len(tr_doc['tests']
* fail[i] tracks tr_doc['tests'][i]
* j in fail[i] == True iff test tr_doc['tests'][i] marks
the jth row of the dblock as bad (boolean 1).
(pygarv_stream[i] >> i) % 2 == True if i-th bit is set
"""
# may or may not be tests for this dblock ...
if tr_doc["tests"] is None:
return None
# this trusts trust the tr_doc is well-formed
test_fails = [[] for t in tr_doc["tests"]]
# > 1 means at least one failed test
fail_idxs = np.where(pygarv_stream > 0)[0]
# decode the non-zero pygarv values
if len(fail_idxs) > 0:
# 1. unpack the test_fails
for i, fail_idx in enumerate(fail_idxs):
s = pygarv_stream[fail_idx] # integer value at the fail
# test_idxs = []
ii = 0
# scan bits only as far as necessary
while 2**ii <= s:
# shift and check last bit ... same as div mod 2
if (s >> np.uint(ii)) & np.uint(1):
# test_idxs.append(ii)
# update failed samples, this test
test_fails[ii].append(fail_idx)
ii += 1
if compress == True:
# RLL compress the fail regions as (x0,x1) intervals
# two cases of interest:
# length 1 (x0,x0)
# length > 1 (x0, x1) where x0 < x1
for i, test_xs in enumerate(test_fails):
# case 1: no points failed this test
if len(test_xs) == 0:
# no fails so no (x0,x1) tuples
test_fails[i] = []
continue
# case 2: one or more points failed this test
x0_x1 = []
cnt = 0
n_xs = len(test_xs)
x0 = test_xs[0] # init left edge, exists b.c len > 0
while cnt < n_xs:
x1 = test_xs[cnt] # update right edge
if cnt + 1 == n_xs:
# last point is always a right bound, no new region
x0_x1.append((x0, x1))
else:
# look ahead ...
if test_xs[cnt + 1] != test_xs[cnt] + 1:
# append and start a new retion
x0_x1.append((x0, x1))
x0 = test_xs[cnt + 1]
cnt = cnt + 1
# overwrite this test sample fails w/ the intervals
test_fails[i] = x0_x1
return test_fails # possibly [ [] ... [] ] if len == 0
# def _ppa(test,dblock,srate):
# """ ppa test handler """
# # data type template for this test
# dtypes = {
# 'test': str,
# 'tag': str,
# 'stream': str,
# 'params': dict(
# threshold = float, # uV max excursion
# interval = float # ms
# )
# }
# # general sanity check
# _check_test(test, dtypes, dblock)
# test_stream = dblock[test['stream']]
# result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool)
# result_stream.fill(False)
# n = int((test['params']['interval']*srate)/1000.0) #
# threshold = test['params']['threshold']
# # test snippets
# idx = 0
# while idx < len(test_stream)-n:
# test_run = test_stream[idx:idx+n]
# idx_result = (test_stream[idx:idx+n].max() -
# test_stream[idx:idx+n].min()) > threshold
# if idx_result:
# result_stream[idx:idx+n] = idx_result
# # idx += n # skip rest of bads
# # continue
# idx += 1
# return(result_stream)
# def _ppadif(test,dblock,srate):
# """ ppadif test handler """
# # data type template for this test
# dtypes = {
# 'test': str,
# 'tag': str,
# 'stream': str,
# 'params': dict(
# stream_2 = str, # other data stream name
# threshold = float, # uV max excursion
# interval = float # ms
# )
# }
# # general sanity check
# _check_test(test, dtypes, dblock)
# test_stream = dblock[test['stream']] - \
# dblock[test['params']['stream_2']]
# result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool)
# result_stream.fill(False)
# n = int((test['params']['interval']*srate)/1000.0) #
# threshold = test['params']['threshold']
# # test snippets
# idx = 0
# while idx < len(test_stream)-n:
# idx_result = (test_stream[idx:idx+n].max() -
# test_stream[idx:idx+n].min()) > threshold
# if idx_result:
# result_stream[idx:idx+n] = idx_result
# # idx += n # skip rest of bads
# # continue
# idx += 1
# return(result_stream)
# def _garv_dblock(hdr, dblock):
# """ run tests given in hdr['pygarv']['tests'] on dblock data streams
# Parameters
# ----------
# hdr : dict in proper mkh5 dblock header format
# dblock : np.ndarray in proper mkh5 dblock format
# Returns
# -------
# pygarv_stream : np.ndarray, shape = (1,len(dblock)), dtype=np.uint64
# non-zero values indicate artifacts, bit-code (2**i) indicates ith test failed
# """
# if 'pygarv' not in hdr.keys():
# raise KeyError('pygarv not found ... add to YAML .yhdr')
# else:
# pg = hdr['pygarv']
# if 'tests' not in pg.keys():
# raise KeyError('tests not found in pygarv dict ... check .yhdr YAML doc name: pygarv')
# if not isinstance(pg['tests'], list):
# msg = "pygarv['tests'] is not a list ... check YAML .yhdr YAML doc name: pygarv"
# raise TypeError(msg)
# # check there is a pygarv method for each named test in the doc
# for i,test in enumerate(pg['tests']):
# if '_' + test['test'] not in pygarv.__dict__.keys():
# msg = ("unknown test name at pygarv['tests'][{0}]: "
# "{1}").format(i, test)
# raise ValueError(msg)
# # check there is enough bit-width to code the number of tests
# # pygarv_dt = np.dtype([('pygarv', 'uint64')])
# pygarv_dt = np.dtype('uint64')
# n_bits = np.uint(pygarv_dt.itemsize * 8)
# if len(pg['tests']) > n_bits:
# msg = ('number of pygarv tests {0} '
# 'exceeds maximum {1}').format(len(pg['tests'],n_bits))
# raise ValueError(msg)
# # setup to capture results
# # n_bytes = int(np.ceil(len(pg['tests'])/8))
# pygarv_stream = np.zeros(shape=(len(dblock),), dtype=pygarv_dt)
# # have at it
# srate = hdr['samplerate']
# for i,test in enumerate(pg['tests']):
# print('{0} {1} {2}'.format(test['test'],
# test['stream'],
# test['tag']))
# func_call = '_{0}(test,dblock,srate)'.format(test['test'])
# # test_results.append( (test, eval(func_call)) )
# pygarv_stream = _encode_pygarv_stream(i,
# eval(func_call),
# pygarv_stream)
# return(pygarv_stream)
# def _mxflat(test, dblock, srate):
# """ max flat test handler """
# # data type template for this test
# dtypes = {
# 'test': str,
# 'tag': str,
# 'stream': str,
# 'params': dict(
# threshold = float, # uV
# interval = float )}
# # general sanity check
# _check_test(test, dtypes, dblock)
# test_stream = dblock[test['stream']]
# result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool)
# result_stream.fill(False)
# # apply test across stream in rolling window
# threshold = test['params']['threshold']
# n = int((test['params']['interval']*srate)/1000.0) #
# idx = 0
# while idx < len(test_stream) - n:
# idx_result = (test_stream[idx:idx+n].max() -
# test_stream[idx:idx+n].min()) < threshold
# if idx_result:
# # mark the flat interval
# result_stream[idx:idx+n] = idx_result
# # idx += n # can't undo bad so fast forward
# # continue
# idx += 1
# return(result_stream)
if __name__ == "__main__":
import argparse # successor to optparse
# set up parser
parser = argparse.ArgumentParser(description="mkh5 artifact tagger")
# names
parser.add_argument("mkh5_f", type=str, help="mkh5 format data")
parser.add_argument(
"--yarf",
type=str,
metavar="myfile.yarf",
dest="yarf_f",
help=".yarf format YAML artifact test file",
)
args_dict = vars(parser.parse_args()) # fetch from sys.argv
# TO DO ... implement --tests option to dump available tests?
if args_dict["yarf_f"] is None:
# bare init w/ mkh5 for viewing
print("pygarv viewer ...")
pg = PyGarv(mkh5_f=args_dict["mkh5_f"])
pg._update_tr_docs_from_mkh5()
mkh5viewer.launch_app(args_dict["mkh5_f"])
else:
# run pygarv to mod the file
print("pygarv marking artifacts ...")
pg = PyGarv(mkh5_f=args_dict["mkh5_f"])
pg._update_tr_docs_from_yaml_f(yarf_f=args_dict["yarf_f"]) # load yarf tests
pg._update_mkh5() # actually mod the h5 file