Source code for mkpy.pygarv

#!/usr/bin/env python3
"""pygarv is the backend for marking artifacts in mkh5 data with tests defined in a YAML file

Successful runs of tests and their results are stored in
PyGarv.tr_docs a list of tr_doc dicts, one dict per h5 datablock.

Parameters
----------
tr_doc['tests'] : list
   each item is a dict


Examples

* tr_doc['tests']

   .. code-block:: python

      [ {'dblock_path_idx': 0,
         'dblock_path': 'calstest/dblock_0',
         'name': 'pygarv',
         'tests': [ [{'test': 'ppa'},
                     {'tag': 'amplitude exursions'},
                     {'stream': 'MiCe'},
                     {'threshold': 0.0},
                     {'interval': 0.0} ],
                    [{'test': 'ppadif'},
                     {'tag': 'amplitude exursions'},
                     {'stream': 'MiCe'},
                     {'threshold': 0.1},
                     {'interval': 0.1},
                     {'stream2': 'MiPa'} ] ]},

          {'dblock_path_idx': 1,
           'dblock_path': 'calstest/dblock_1',
           'name': 'pygarv',
           'tests': None},
      ]

tr_doc['fails'] : list

len(tr_doc['fails'] == len(tr_doc['tests'])
where tr_doc['fails'][idx] is

a list of (start, stop) intervals in dblock_tick indexes where tr_doc['test'] failed. 
tr_doc['pygarv']



* The tests are specified as a YAML file .yarf.

  .. code-block:: yaml

     ---
     dblock_path: some_path
     dblock_path_idx: unint
     name: pygarv
     tests:
     - - test_spec
       - test_spec
       ...
       - test_spec

* Each `test_spec` is a YAML map with a mandatory `name` and `tag`
  parameter and optional other paramters as needed for specific tests

  test: str
  tag: str

where

  `test` names a pygarv test function, e.g., `mxflat`, `ppadif`
  `tag` is a user-defined descriptive tag, e.g., *blocking*, *heog*, *fancy test*

"""
__version__ = "0.0.0"

from collections import OrderedDict
import re
import pdb
import logging

import numpy as np
import pprint as pp
from matplotlib import pyplot as plt
import copy
import yaml
from yamllint import linter
from yamllint.config import YamlLintConfig
from mkpy import mkh5
import h5py
import sys
import warnings

# import dpath.util
from . import dpath

from mkpy import mkh5viewer


[docs]class PyYarf(object): """YAML test file I/O for PyGarv artifact test parameters Parameters ---------- yarf_f : str file path to well-formed YAML with PyYarf test specification structure Attributes ---------- yarf_docs : list each item is a yarf_doc dict that yamlizes in-out without modification ..code-block:: python {'name': 'pygarv' (str), 'dblock_path_idx': n (uint) 'dblock_path': path_to_a_mkh5_dblock (str), 'tests': [ test_spec, ... test_spec] (list)} Methods ------------ IO methods read yarf_docs from yaml write yarf_docs to yaml read yarf_docs from mkh5 headers PyYarf YAML format: * exactly one yaml document per mkh5 dblock_path * each doc is a map with 3 keys: `name`, `dblock_path`, `tests` * the value of `name` must be `pygarv` (str) * the value of `dblock_path` in the ith yaml doc must == mkh5.data_blocks[i] (str) * the value of `tests` must be a list of test specifications (see PyGarvTest docs) Examples .. code-block:: yaml # generated by PyYarf --- dblock_path_idx: 0 dblock_path: calstest/dblock_0 name: pygarv tests: - - test: ppa_event - tag: tag1 - stream: MiPf - threshold: 20.0 - prestim: 500.0 - poststim: 1500.0 - - test: ppa_event - tag: tag1 - stream: MiCe - threshold: 50.0 - prestim: 100.0 - poststim: 1000.0 - - test: ppa_event - tag: tag1 - stream: MiPa - threshold: 10.0 - prestim: 10.0 - poststim: 200.0 --- dblock_path_idx: 1 dblock_path: calstest/dblock_1 name: pygarv tests: [] """ __version__ = "0.0.1" _yarf_config = YamlLintConfig("extends: default") _yarf_doc_template = dict( dblock_path_idx=None, dblock_path="", name="pygarv", tests=[] ) def __init__(self, yarf_f=None): if yarf_f is not None: self.yarf_f = yarf_f # yarf file CRUD ... yaml I/O
[docs] def read_from_yaml(self, yarf_f): """return yarf_doc list populated with yarf info, if any, from mkh5 dblock headers""" with open(yarf_f, "r") as yf: yaml_stream = yf.read() self.lint_yarf(yaml_stream) # raises Exception on bad YAML # thank you PyYaml yarf_iter = yaml.load_all(yaml_stream, Loader=yaml.SafeLoader) yarf_docs = [] for yarf_doc in yarf_iter: self.check_yarf_doc(yarf_doc) yarf_docs.append(yarf_doc) return yarf_docs # so far so good ...
[docs] def read_from_mkh5(self, mkh5_f): """scan mkh5 dblock headers and dblock['pygarv'] stream artifact test info Returns ------- yarf_docs : list of list of dict where dict is a PyYarf format dict see PyYarf doc string for details """ yarf_docs = list() has_yarf = list() # for error checking h5 = mkh5.mkh5(mkh5_f) for dbpath in h5.data_blocks: hdr, _ = h5.get_dblock(dbpath) yarf_doc = None if "pygarv" in hdr.keys(): yarf_doc = hdr["pygarv"] self.check_yarf_doc(yarf_doc) has_yarf.append(True) else: # build an empty one yarf_doc = dict([(k, v) for k, v in self._yarf_doc_template.items()]) # print('loading empty yarf_doc', dbpath) yarf_doc["dblock_path"] = dbpath has_yarf.append(False) yarf_docs.append(yarf_doc) # none is OK, all is OK, some but not all is probably pathological if any(has_yarf) and not all(has_yarf): missing = [h5.data_blocks[i] for i, d in enumerate(has_yarf) if d is False] msg = ( "uh oh ... missing pygarv info in headers of {0} " "dblocks {1}" ).format(mkh5_f, missing) raise ValueError(msg) return yarf_docs
[docs] def to_yaml(self, yarf_docs): """return yarf_docs YAML-ized as string suitable for serialization""" yaml_stream = ( "# generated by PyYarf v. {0}, " "edit at your own risk\n" ).format(self.__version__) for yarf_doc in yarf_docs: yaml_stream += "---\n" # doc delimiter yaml_stream += yaml.dump( yarf_doc, explicit_start=False, default_flow_style=False ) self.lint_yarf(yaml_stream) return yaml_stream
# YAML lint a string
[docs] def lint_yarf(self, yarf_stream): """run yamllint on yarf_stream, if errors die informatively""" errors = [e for e in linter.run(yarf_stream, PyYarf._yarf_config)] if errors != []: msg = "\n\n*** {0} ***\n\n".format(self) for e in errors: msg += "{0}\n".format(e) raise Exception(msg)
[docs] def check_yarf_doc(self, yarf_doc): # does each YAML doc have all and only the right keys? if set(yarf_doc.keys()) != set(PyYarf._yarf_doc_template.keys()): msg = ( "yarf doc {0} must have " "exactly these keys: {1}" "".format(yarf_doc, PyYarf._yarf_doc_template.keys()) ) raise KeyError(msg) # name? if yarf_doc["name"] != PyYarf._yarf_doc_template["name"]: msg = ("yarf doc 'name': {0} " "must be {1}").format(yarf_doc["name"]) raise ValueError(msg) # if there are tests are they a list? if yarf_doc["tests"] is not None: if not isinstance(yarf_doc["tests"], list): msg = ("yarf doc 'tests': {0} " "must be a list of tests").format( yarf_doc["tests"] ) raise ValueError(msg) for i, t in enumerate(yarf_doc["tests"]): if not isinstance(t, list): msg = "{0} must be a list".format(t) raise ValueError(msg) for param_spec in t: if not isinstance(param_spec, dict) or len(param_spec.keys()) != 1: msg = ( '.yarf {0}: test parameter "{1}" is not ' "a {{key:value}} pair" "" ).format(yarf_doc["dblock_path"], param_spec) raise ValueError(msg)
[docs]class PyGarvTest(OrderedDict): """Decorator class for the PyGarv tests. This enforces an extensible standard form on PyGarv test specs and execution. The class derives from OrderedDict so it returns .keys() .values() .items() in fixed original parameter order. This is useful for populating test UI elements and reading writing YAML sequences without scrambling the key:value pairs the way a dict() might. Parameters ---------- param_specs : [(key,type), ...] key : str parameter label type : Python type required Python data type for values of the key ('test',str), ('tag', str), ('stream', str), * Default test parameters (in sequence order) test : str corresponds to the self._test() function that runs it tag : str user specified descriptive tag for the test ... anything sensible stream : str name or regex pattern for primary dblock data stream(s) to run the test on * Optional test specific `parameter:type` pairs are defined in the decorator arguments Raises ------ ValueError If the type of a test parameter differs from that in ``param_specs`` * ``PyGarvTest`` overrides ``OrderedDict.__setitem__()`` with additional type checking on the value of test['key'] = value * The class variable ``param_specs`` specifies mandatory ``PyGarvTest`` parameters and types. * Optional decorator arguments can extend the mandatory parameters and types and will be automatically passed to the decorated test function. * all PyGarvTest instances have _default_params with key, type * optional decorater args extend PyGarvTest instances with additional params * public CRUD API is standardized * To preserve test spec order for display and yamlized round trips, test specs are stored internally as OrderedDicts and the setter/getter API wants and returns lists of dict, i.e., ..code-block:: python [{'test':'ppa'}, ...{'interval':1500.0}] Methods ------- run(hdr, dblock, **kwargs) Parameters ---------- hdr : dict metadata consulted in running the tests, e.g., sampling rate dblock : np.ndarray (named dtypes) columns of data, typically accessed by dtype.name Returns ------- results : np.ndarray, dtype=bool, length = len(dblock) sample-wise data rejection mask, 1=bad, 0=good Usage ----- """ _max_path_len = 4096 # no particular reason, roughly max linux path length # default parameter types, all PyGarvTests __default_param_specs = [("test", str), ("tag", str), ("stream", str)] def __init__(self, test, **kwargs): """test is passed in by decorator, kwargs are optional param=type specs""" # handle the types self._param_types = dict() # default params for p, t in PyGarvTest.__default_param_specs: self._param_types[p] = t # add any extra params, types from decorator for p, t in kwargs.items(): self._param_types[p] = t # set as an attribute to cross-check case self['test'] self.test = test # self['test'] = test, other key values = None self.reset() # Override setting to include validation def __setitem__(self, key, value): """type check all item settings""" if value is not None: # check type all keys if not isinstance(value, self._param_types[key]): msg = ("{0}: {1} value required type {2}" "").format( key, value, self._param_types[key] ) raise TypeError(msg) # prevent string overrun if isinstance(value, str) and len(value) > PyGarvTest._max_path_len: msg = ("string length exceeds {0} for {1}: {2} ... {3}" "").format( PyGarvTest._max_path_len, key, value[0:10], value[-10:] ) raise RuntimeError(msg) # check stream patterns can be compiled if "stream" in key: try: re.compile(value) except Exception as err: msg = "bad regexp pattern: {0}".format(value) msg += " ... {0}".format(" ".join([arg for arg in err.args])) raise ValueError(msg) # cross-check if key == "test" and value != self.test: msg = ( "failed on test={0} ... cannot change read-only value" "" ).format(value) raise KeyError(msg) OrderedDict.__setitem__(self, key, value) # ------------------------------------------------------------ # decorator magic here wraps the decorated function, e.g. # # @PyGarvTest(ppa) # ppa(header, dblock, **kwargs) # # with kwargs from self.keys(), self.values() and # exposes the callable self.run() to do the work # # Similar to duck typing flexibility but with built in keys and # value type-checking per-test by the decorator ... simple but # flexible. # ------------------------------------------------------------ def __call__(self, f, *args, **kwargs): """f(header, dblock, ...)""" def run(*args, **kwargs): # convert self odict k,v to dict for kwargs test_params = dict([(k, v) for k, v in self.items()]) return f(*args, **test_params) self.run = run self.run.__doc__ = f.__doc__ # now PyGarv.ppa.run(hdr, dblock, ...) will execute with current params return self # -------------------------------------------------- # public-ish setter/getters # --------------------------------------------------
[docs] def set_specs(self, test_params): """test_params is {key:value, ... } for test keys,values""" for (k, v) in test_params.items(): self[k] = v
[docs] def reset(self): # init the values for k in self._param_types.keys(): if k == "test": self[k] = self.test else: self[k] = None
[docs] def get_specs(self): return dict([(k, v) for k, v in self.items()])
[docs] def param_type(self, param): """type of param""" return self._param_types[param]
@property def params(self): """names of the parameters this test as a list""" return [p for p in self._param_types.keys()] @property def types(self): """data types of the values for the parameters as a list""" return [t for t in self._param_types.values()] @property def param_types(self): return self._param_types @property def specs(self): return self.get_specs() @property def specs_as_yaml(self): """returns current specs as yaml string""" yaml_specs = yaml.dump( self.specs, explicit_start=True, default_flow_style=False ) return yaml_specs
[docs]class PyGarv(object): r"""container to hold an inventory of functions for computing sample-wise artifact masks. When invoked at the command line, pygarv needs an mkh5 file to work with There are two cases: - has not been previously garved with _update_mkh5() - no pygarv test info in header - pygarv data streams all zeros - data has been previously garved with _update_mkh5() - pygarv test info appears in header - test results are unknown, possibly None - pygarv data stream state is unknown On init the mkh5 file is scanned for previous runs, if found the pygarv data buffers (volatile) are synced with the info from the h5 file. For each data block: - self.tr_docs are set to match the header['pygarv'] dict - self.yarf_fails are set according to dblock['pygarv'], self.tr_docs - the value of pygarv = run_test(db_idx) (what-if run) is checked against the dblock data, discrepanices throw a warning PyGarv now has persistent and volatile rejection data in alignment, suitable for viewing/editing in mkh5viewer PyGarvTest The PyGarvTest decorator handles all the default parameter name and type bookkeeping for specific tests To add a test to the catalog ... 1. implement a function that takes two args (hdr, dblock, \*\*kwargs) and returns a boolean artifact mask of length dblock data samples where 0 = good, 1 = bad. The hdr (dict), and dblock (np.ndarray) are, e.g., as returned by hdr, dblock = mkh5.get_dblock(path_to_datablock) but can by any dict and dblock that expose variable needed to compute the artifact mask. 2. decorate it with @PyGarvTest(test_name, [key=dtype, key=dtype]) where test_name is the test name and the list of key_i=dtype_i optionally gives extra parameters named key_1, ... key_n with data type dtype. """ def __init__(self, mkh5_f, yarf_f=None): """continuous artifact rejection manager # FIX ME ... move to main PyGarv docs - self.mkh5 read-only record of tests in dblock['pygarv'] and hdr['pygarv']. persistent. - self.dblock_paths : list of str, sequence of all the mkh5 datablock slashpaths as returned by mkh5.data_blocks - self.tr_docs : list of dict "tr_" abbreviates "test results". There is exactly one tr_doc per mkh5 data and it is the master pygarv data structure. It contains contain all the yarf_docs test info *plus* test run results as a pygarv vector (1-D array) of 64-bit uints and a list of fails. By design, tests are dry run *before* loading into tr_docs so these volatile test_specs, fails, and pygarv vector are always 1-1 and consistent. The tr_doc initialized from mkh5_f headers or loaded from YAML .yarf. The tr_docs may be passed to PyYarfIO and the test specs written as YAML. tr_doc format on init {'name': 'pygarv', 'dblock_path': '' 'dblock_path_idx: None 'tests': [ [] ... [] ], 'fails': [ [] ... [] ], 'pygarv': np.zeros(shape=(len(dblock), ), dtype=dblock['pygarv'].dtype) Each item in `tests` is a PyGarvTest format test specification Each item in `fails` item is the corresponding test failures of the form: [(x0_0,x1_0) ... (x0_n,x1_n)] where x0_i,x1_i are the index of beginning and end of a contiguous fail. Note: x0==x1 for single sample failed is allowed. dbp_index dblock_paths tr_docs ----- -------------- --------- 0 dblock_paths[0] tr_docs[0] 1 dblock_paths[1] tr_docs[1] . . . . . . . . . n dblock_paths[n] tr_docs[n] """ # reset test parameters b.c. with class decorator approach, new # instances in the same namespace inherit test specs set # during previous calls for attrib in ["_catalog", "mkh5", "mkh5_f", "dblock_paths", "yarf", "tr_docs"]: setattr(self, attrib, None) # clear tests in case of carry over from previous test runs self._reset_tests() # set the inventory of available tests self._catalog = self._init_catalog() # set the file names self.mkh5_f = mkh5_f # ready the mkh5 data self.mkh5 = mkh5.mkh5(mkh5_f) self.dblock_paths = self.mkh5.data_blocks # ready the yarf I/O manager self.yarf = PyYarf() # # init the tests and results data structure self.tr_docs = list() for dbp_idx, dbp in enumerate(self.mkh5.data_blocks): hdr, dblock = self.mkh5.get_dblock(dbp) tr_doc = { "dblock_path": dbp, "dblock_path_idx": dbp_idx, "name": "pygarv", "tests": [], "fails": [], "pygarv": np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype), } self.tr_docs.append(tr_doc) def _update_tr_docs_from_yarf_docs(self, yarf_docs): """iterates through an entire yarf_docs (= test specs) and updates tr_docs""" self._check_tr_docs(yarf_docs) # dry run individual tests all tests individually for dbp_idx, dbp in enumerate(self.mkh5.data_blocks): yarf_doc = yarf_docs[dbp_idx] for test_idx, test in enumerate(yarf_doc["tests"]): # default test_idx=None is to append the tests and results exception = self._update_tr_docs(dbp_idx, test_idx, test) if exception is not None: exception.args = ( "{0} in data_block: {1} ".format(*exception.args, dbp), ) raise exception self._check_tr_docs(yarf_docs) def _update_tr_docs_from_mkh5(self): """scrape all yarf docs, from mkh5 headers, run and collect all in tr_docs""" print("updating tr_docs from mkh5") if self.mkh5_f is None: raise ValueError("mkh5_f not set") # gotta have some data ... if self.dblock_paths == []: msg = "no data block paths in " + self.mkh5_f raise ValueError(msg) # populate yarf docs from existing hdr['pygarv'] footprints yarf_docs = self.yarf.read_from_mkh5(self.mkh5_f) # collect tests in tr_docs self._update_tr_docs_from_yarf_docs(yarf_docs) # FIX ME??? ... compare tr_doc['pygarv '] with dblock['pygarv'] and # and warn of mismatch def _update_tr_docs_from_yaml_f(self, yarf_f): """collect all YAML tests and results into tr_docs""" yarf_docs = self.yarf.read_from_yaml(yarf_f) self._update_tr_docs_from_yarf_docs(yarf_docs) # # pygarv.mkh5 may or may not have been pygarved. Either way, # # if there is a yarf_f, it trumps previous pygarv info # if self.yarf_f is None: # raise ValueError('yarf_f not set') # tr_docs getters ... def _get_yarf_doc_from_tr_doc(self, tr_doc): """pulls out just the yarf_doc test info from current tr_docs[dbp_idx], no results. Unfortuante consequence of pooling tests and results in tr_docs, tho a lesser evil than segregating them. """ yarf_doc_keys = PyYarf._yarf_doc_template.keys() yarf_doc = dict() for k in yarf_doc_keys: yarf_doc[k] = tr_doc[k] return yarf_doc def _get_yarf_docs_from_tr_docs(self): yarf_docs = [] for tr_doc in self.tr_docs: yarf_docs.append(self._get_yarf_doc_from_tr_doc(tr_doc)) return yarf_docs def _init_catalog(self): """inventory of implemented tests as given by @PyGarvTest""" catalog = dict() for a in dir(self): this_attr = getattr(self, a) if isinstance(this_attr, PyGarvTest): this_attr.reset() # clear params catalog.update(dict([(this_attr.test, this_attr)])) return catalog def _load_tr_docs_from_yaml(self, yarf_f): """populate self.tr_docs dict from YAML yarf file""" return self.yarf.read_from_yaml(yarf_f) def _check_tr_docs(self, tr_docs): """sanity check tr_docs list lines up with h5 data blocks and each yarf_doc is well-formed. Checks form only, does not re-run tests or check semantics of the results """ if tr_docs is None: raise ValueError("tr_docs is None") assert len(self.dblock_paths) == len(tr_docs) for dbp_idx, db_path in enumerate(self.dblock_paths): tr_doc = self.tr_docs[dbp_idx] assert len(tr_doc["tests"]) == len(tr_doc["fails"]) # do pygarv and fails agree at least in form? if ( all(tr_doc["pygarv"] == 0) and any(len(fail) > 0 for fail in tr_doc["fails"]) ) or ( any(tr_doc["pygarv"] != 0) and all(len(fail) == 0 for fail in tr_doc["fails"]) ): msg = "tr_docs[{0}] fails and pygarv do not agree".format(dbp_idx) raise ValueError(msg) # check that non-zero bits in the tr_doc['pygarv'] still # agree w/ non-emtpy tr_doc['fails'] fails_from_pygarv_bits = self._decode_pygarv_stream( tr_doc["pygarv"], tr_doc ) if len(tr_doc["fails"]) > 0 or max(tr_doc["pygarv"]) > 0: if tr_doc["fails"] != fails_from_pygarv_bits: log_msg = ( "tr_doc[" "pygarv" "] bits do not match non-empty tr_doc[" "fails" "] ... yell at urbach immediately\n" ) log_msg += "data block path: {0}\n".format(db_path) log_msg += ( "fails according to tr_doc[" "pygarv" "] bits: {0}\n".format(fails_from_pygarv_bits) ) log_msg += ( "fails according to tr_doc[" "fails" "]: {0}".format(tr_doc["fails"]) ) logging.error(pp.pformat(log_msg)) err_msg = ( "probable pygarv bug ... see the latest .mkpy/logs for details" ) raise ValueError(err_msg) def _reset_tests(self): """clears parameter specs, all tests""" for a in dir(self): this_attr = getattr(self, a) if isinstance(this_attr, PyGarvTest): this_attr.reset() # clear any residual parameters
[docs] def get_catalog(self): return self._catalog
def _run_test(self, dbp_idx, test_spec): """run the test on the dblock[dpb_idx] data This is a dry run ... returns usable results and fails, does not change data Parameters ---------- dbp_idx : uint index of the datablock to run the tests on test_spec : list pygarv test specs format [param:value, ... param:value] Returns ------- (results, fails) : 2-ple results : np.ndarray, len(dblock), dtype=bool True at samples where test failed fails : list of uint tuples [{'x0': i, 'x1': j, 'test_idx', k}, ... ] where the i, j are start, end of consecutive True, i.e., a stretch of bad data and k is the index of the test in the list of tests, this dblock Normative use is where `result` is returned by pg.sometest.run(hdr,dblock) `test` is an item from the test list at tr_docs[dbp_idx]['tests'] """ # pg.dblock_paths == mkh5.data_blocks, should fix mkh5 dbp = self.dblock_paths[dbp_idx] # lookup data block hdr and data. Note: strict hdf5 root paths # have a slash prefix, the h5py root datagroup path does not # so we remain agnostic if self.dblock_paths is None: raise ValueError("PyGarv.dblock_paths is None") hdr, dblock = self.mkh5.get_dblock(dbp) hdr_dbp = re.match(r"/*(.+)", hdr["h5_dataset"]).groups()[0] # sanity checks ... header, h5.data_blocks, yarf ...does everything agree? if not dbp == hdr_dbp: # == test_params['dblock_path']: msg = "Fatal mismatch in tr_doc, h5 header, test dblock path" raise ValueError(msg) # fill test specs test_params = dict() for kv in test_spec: test_params.update(kv) # lookup the test function in the pygarv catalog by name # and set its params this_test = self._catalog[test_params["test"]] this_test.reset() this_test.set_specs(test_params) # run it, then clear the settings this_result = this_test.run(hdr, dblock) this_test.reset() # fails [ (start,stop) ... (start,stop)] for contiguous fails fails = self._compress_result(this_result) # if we make it here ... update return (this_result, fails) def _compress_result(self, result): """compress full-length boolean pygarv boolean result test vector into a list of fails [(start,stop)...(start,stop)] tuples encoding contiguous runs of True. """ fail_idxs = np.where(result == True)[0] fails_x0_x1 = [] # case 1: no points failed this test if len(fail_idxs) == 0: # no fails so no (x0,x1) tuples pass else: # case 2: one or more points failed this test cnt = 0 n_xs = len(fail_idxs) x0 = fail_idxs[0] # init left edge, exists b.c len > 0 while cnt < n_xs: x1 = fail_idxs[cnt] # update right edge which may == left edge if cnt + 1 == n_xs: # last point is always a right bound, no new region fails_x0_x1.append((x0, x1)) else: # look ahead continuous fail if fail_idxs[cnt + 1] != fail_idxs[cnt] + 1: # if not, append and start a new retion fails_x0_x1.append((x0, x1)) x0 = fail_idxs[cnt + 1] cnt = cnt + 1 return fails_x0_x1 # possibly []
[docs] def run_dblock(self, dbp_idx, tr_doc): """Run tests in the tr_doc for datablock at dbp_idx, returns 64-bit pygarv sample mask. Parameters ---------- dpb_idx : uint index of the ith dblock in self.dblock_paths tr_doc : dict PyYarf format dict with tr_doc['tests'] Returns ------- results dict of results like so: .. code-block:: python {name: 'results', dblock_path: str (== the yarf_dbp), pygarv : np.ndarray(shape=(len(dblock),), dtype=dblock['pygarv'].dtype), fails : list of uint 2-ples (x0, x1)} * The fails list amounts to an RLL compression of the boolean vector `pygarv > 0` Raises ------ ValueError if tr_doc['dblock_path'] != self.dblock_paths[dbp_idx] """ # originally from h5 dbp = self.dblock_paths[dbp_idx] # from PyYarf yarf_dbp = tr_doc["dblock_path"] yarf_test_list = tr_doc["tests"] # lookup data block hdr and data. Note: hdf5 root paths have a # slash prefix, the h5py root datagroup path does not so we # remain agnostic if self.dblock_paths is None: raise ValueError("PyGarv.dblock_paths is None") hdr, dblock = self.mkh5.get_dblock(dbp) hdr_dbp = re.match(r"/*(.+)", hdr["h5_dataset"]).groups()[0] # three way sanity check ... header, h5.data_blocks, yarf ...does everything agree? if not dbp == yarf_dbp == hdr_dbp: msg = "Fatal mismatch in mkh5 - pygarv - .yarf dblock path" raise ValueError(msg) # init the return ... no tests -> no fails -> all zeros results = dict( name="results", dblock_path=yarf_dbp, pygarv=np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype), fails=None, ) # if nothing to do, return if yarf_test_list is None or len(yarf_test_list) == 0: return results # otherwise run the tests in yarf test list # init to whatever dtype is in the dblock['pygarv'] results["pygarv"] = np.zeros(shape=(len(dblock),), dtype=dblock["pygarv"].dtype) # compute the bit fiddled pygarv artifact mask for i, t in enumerate(yarf_test_list): # gather the list of key:value params for this test in # into a dict for test.set_specs(kwargs) test_params = dict() # test_params['dblock_path'] = dbp # DEPRECATED for kv in t: test_params.update(kv) # lookup the test function in the pygarv catalog by name # and set its params this_test = None this_test = self._catalog[test_params["test"]] this_test.set_specs(test_params) # run it then clear the params result = None result = this_test.run(hdr, dblock) this_test.reset() # update the results['pygarv'] stream from the boolean result # by OR masking the ith bit where the test fails results["pygarv"] = self._encode_pygarv_stream(i, result, results["pygarv"]) # map the fiddled bits back to indices of the test in # tr_doc that failed ... this is for visualization, human # consumption at run time, not stored in mkh5 hdr or dblock results["fails"] = self._decode_pygarv_stream(results["pygarv"], tr_doc) if tr_doc["fails"] != results["fails"]: msg = ("updating test results: {0}").format(tr_doc) warnings.warn(msg) return results
[docs] def get_result(self, pg_test_result): """convenience wrapper to query a test result, decode the mask, and return with its test in a handy package. Parameters ---------- pg_test_result : a (tr_doc, pygarv_mask) tuple as returned by run_* functions """ raise NotImplementedError
[docs] def run_tests(self): """fetch tests and pygarv mask for all dblocks, does not modify mkh5 Returns ------- pg_test_results : list of 2-ples (tr_doc, pygarv_mask), one for each datablock in self.mkh5 """ # one mask per dblock, suitable for assigning to # hdr['pygarv'] results = [] for dbp_idx, dbp in enumerate(self.dblock_paths): # print('pygarving', dbp) these_results = None tr_doc = self.tr_docs[dbp_idx] results.append(self.run_dblock(dbp_idx, tr_doc)) return results
def _update_mkh5(self): """wrapper to pull tests and results out of tr_docs and push them into mkh5""" if self.tr_docs is None: ValueError("no tr_docs") hio = self.mkh5.HeaderIO() # used below to update header for dbp_idx, dbp in enumerate(self.mkh5.data_blocks): # sanity check tr_doc = self.tr_docs[dbp_idx] if tr_doc["dblock_path"] != dbp: msg = "uh oh ... mkh5 v. tr_doc dblock_path mismatch in _update_mkh5" raise ValueError(msg) if tr_doc["dblock_path_idx"] != dbp_idx: msg = ( "uh oh ... mkh5 v. tr_doc dblock_path_idx mismatch in _update_mkh5" ) raise ValueError(msg) try: with h5py.File(self.mkh5_f, "r+") as h5: dblock = h5[dbp] # open, read write h5py Dataset # overwrite the pygarv data stream and header['pygarv'] dblock["pygarv"] = tr_doc["pygarv"] # bit-fiddled stream hio.get(dblock) # fetch header, this dblock # brittle ... accessing header dict directly hio._header["pygarv"] = self._get_yarf_doc_from_tr_doc(tr_doc) hio.set(dblock) # modded header jsonified into dblock.attrs except Exception as err: msg = ( "\nVERY VERY BAD ... pygarving {0} {1} failed part " "way through, possible mkh5 data corruption." "".format(self.mkh5_f, self.yarf_f) ) if len(err.args) == 0: err.args = (msg,) else: err.args = ("{0} {1}".format(err.args[0], msg),) raise err # ------------------------------------------------------------ # Test Developer API: # # hdr : dict # mkh5 dblock header ... sample rate, column specs, etc.. # # dblock : np.ndarray # mkh5 data block # # default kwarg keys exposed by PyGarvTest for use in the test body: # # 'dblock_path' (str) current dblock_path # 'test' (str) name of this test # 'tag' (str) user description, may be anything # 'stream' (str) name of dblock dtype for column selection # 'threshold' (float) test critical value # 'interval' (float) length of interval (ms) to sweep the test # # Note: to dump or inspect kwarg keys:values at run time use, e.g., # # print(kwargs) # pdb.set_trace() # # # ------------------------------------------------------------ # garv-like event-based tests ... *only* the event code sample gets tagged # param_types = dict(stream=str, threshold=float, prestim=float, poststim=float) # @PyGarvTest('ppa', **param_types) # def ppa(hdr, dblock, *args, **kwargs): # '''tag events with single stream peak-to-peak amplitude excursions # Parameters # ---------- # stream : regex # stream label pattern to match, e.g. 'MiPa' or '\w+' # threshold : float # amplitude excursion threshold # prestim : float (>= 0) # length in ms to scan before the anchor event # poststim : float (>= 0) # length in ms to scan after the anchor event # ''' # stream = kwargs['stream'] # threshold = kwargs['threshold'] # prestim_ms = kwargs['prestim'] # poststim_ms = kwargs['poststim'] # print('running ppa_event', threshold, prestim_ms, poststim_ms) # prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr['samplerate']) # poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr['samplerate']) # n_samps = len(dblock) # result = np.full(shape=(n_samps,), fill_value=False) # ev_idxs = np.where(dblock['log_evcodes'] > 0)[0] # for ev_idx in ev_idxs: # interval = slice( max(0,ev_idx-prestim_samps), min(n_samps, ev_idx + poststim_samps)) # ev_view = dblock[stream][interval].view() # if np.ptp(ev_view) > threshold: # result[ev_idx] = True # mark this event sample bad # return result
[docs] @PyGarvTest("ppa", stream=str, threshold=float, prestim=float, poststim=float) def ppa(hdr, dblock, *args, **kwargs): r"""tag event if any stream regexp match has peak-to-peak amplitude excursion Parameters ---------- stream : regex stream label pattern to match, e.g. '.+' or 'MiPa' or '\w+' threshold : float amplitude excursion threshold prestim : float (>= 0) length in ms to scan before the anchor event poststim : float (>= 0) length in ms to scan after the anchor event """ stream_patt = kwargs["stream"] test_streams = [ stream for stream in hdr["streams"].keys() if re.match(stream_patt, stream) and "dig_chan" in hdr["streams"][stream]["source"] ] if len(test_streams) == 0: msg = "no streams match: {0}".format(stream_patt) raise RuntimeError(msg) threshold = kwargs["threshold"] prestim_ms = kwargs["prestim"] poststim_ms = kwargs["poststim"] print( "running ppa with regex stream", threshold, prestim_ms, poststim_ms, test_streams, ) prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"]) poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"]) n_samps = len(dblock) result = np.full(shape=(n_samps,), fill_value=False) # init no artifacts ev_idxs = np.where(dblock["log_evcodes"] > 0)[0] for ev_idx in ev_idxs: for stream in test_streams: interval = slice( max(0, ev_idx - prestim_samps), min(n_samps, ev_idx + poststim_samps), ) ev_view = dblock[stream][interval].view() # thank you numpy ... if np.ptp(ev_view) > threshold: result[ ev_idx ] = True # mark this event sample bad on first bad channel break # no need to look further if result[ev_idx]: continue # move on to the next event return result
[docs] @PyGarvTest( "maxflat", stream=str, threshold=float, nsamp=int, prestim=float, poststim=float ) def maxflat(hdr, dblock, *args, **kwargs): r"""tag events on regex stream for flat runs Parameters ---------- stream : regex stream label pattern to match, e.g. 'MiPa' or '\w{3}$' threshold : float minimum range allowable nsamp : int length in samples of rolling window to scan for flatness prestim : float (>= 0, units ms) time (ms) relative to the event to start scanning for flatness poststim : float (>= 0, units ms) time (ms) relative to the event to stop scanning for flatness Returns ------- result : np.ndarray(shape=(len(dblock), ), dtype='bool') True at dblock indexs where test fails """ stream_patt = kwargs["stream"] test_streams = [ stream for stream in hdr["streams"].keys() if re.match(stream_patt, stream) and "dig_chan" in hdr["streams"][stream]["source"] ] if len(test_streams) == 0: msg = "no streams match: {0}".format(stream_patt) raise RuntimeError(msg) threshold = kwargs["threshold"] prestim_ms = kwargs["prestim"] poststim_ms = kwargs["poststim"] win_len = kwargs["nsamp"] if win_len <= 0: raise ValueError("maxflat nsamp must be > 0") print("running mxflat_event", threshold, win_len, prestim_ms, poststim_ms) prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"]) poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"]) n_samps = len(dblock) result = np.full(shape=(n_samps,), fill_value=False) ev_idxs = np.where(dblock["log_evcodes"] > 0)[0] for ev_idx in ev_idxs: interval = slice( max(0, ev_idx - prestim_samps), min(n_samps, ev_idx + poststim_samps) ) for stream in test_streams: ev_view = dblock[stream][interval].view() for i in range(len(ev_view) - win_len): # scan nsamp sub intervals for a flat line if np.ptp(ev_view[i : i + win_len]) < threshold: result[ev_idx] = True # mark this event sample bad break # no need to look further if result[ev_idx]: continue # done, these streams move on to the next event return result
param_types = dict( stream=str, stream2=str, threshold=float, prestim=float, poststim=float )
[docs] @PyGarvTest("ppadif", **param_types) def ppadif(hdr, dblock, *args, **kwargs): """tag events with two-stream amplitude difference excursions""" stream = kwargs["stream"] stream2 = kwargs["stream2"] threshold = kwargs["threshold"] prestim_ms = kwargs["prestim"] poststim_ms = kwargs["poststim"] print("running ppadif", threshold, prestim_ms, poststim_ms) prestim_samps = mkh5.mkh5._ms2samp(prestim_ms, hdr["samplerate"]) poststim_samps = mkh5.mkh5._ms2samp(poststim_ms, hdr["samplerate"]) n_samps = len(dblock) result = np.full(shape=(n_samps,), fill_value=False) ev_idxs = np.where(dblock["log_evcodes"] > 0)[0] for ev_idx in ev_idxs: interval = slice( max(0, ev_idx - prestim_samps), min(n_samps, ev_idx + poststim_samps) ) if ( np.ptp( dblock[interval][stream].view() - dblock[interval][stream2].view() ) > threshold ): result[ev_idx] = True return result
# continuous data tagging tests ---------------------------------------- param_types = dict(threshold=float, interval=float)
[docs] @PyGarvTest("cstdev", **param_types) def cstdev(hdr, dblock, *args, **kwargs): """tag intervals that span cross-channel amplitude standard deviation excursions""" threshold = kwargs["threshold"] interval_ms = kwargs["interval"] print("running stdev", threshold, interval_ms) interval_samps = mkh5.mkh5._ms2samp(interval_ms, hdr["samplerate"]) eeg_streams = [ stream_name for stream_name, stream in hdr["streams"].items() if "dig_chan" in stream["source"] ] result = np.zeros(shape=(len(dblock),), dtype=bool) # for i in range(len(dblock)): # result[i] = np.std(dblock[eeg_streams][i].astype(np.ndarray)) > threshold nsamp = len(dblock) idx = 0 cntr = 0 while idx + interval_samps + 1 < nsamp: for i in range(idx, idx + interval_samps + 1): std = np.std(dblock[eeg_streams][i].astype(np.ndarray)) if std > threshold: result[idx : idx + interval_samps] = 1 break idx += interval_samps return result
# this test takes two extra params w/ numpy dtypes str, float param_types = dict(stream=str, stream2=str, threshold=float, interval=float)
[docs] @PyGarvTest("cppadif", **param_types) def cppadif(hdr, dblock, *args, **kwargs): """peak-to-peak amplitude difference stream2 - stream""" stream = kwargs["stream"] stream2 = kwargs["stream2"] threshold = kwargs["threshold"] interval_ms = kwargs["interval"] interval_samps = mkh5.mkh5._ms2samp(interval_ms, hdr["samplerate"]) print("running ppadif:", stream, stream2, threshold, interval_ms) nsamp = len(dblock) result = np.zeros(shape=(nsamp,), dtype=bool) result = np.abs((dblock[stream2] - dblock[stream])) > threshold idx = 0 # fast enough while idx + interval_samps + 1 < nsamp: if any(result[idx : idx + interval_samps + 1]): result[idx : idx + interval_samps] = 1 idx += interval_samps return result
param_types = dict(stream=str, threshold=float, interval=float)
[docs] @PyGarvTest("cppa", **param_types) def cppa(hdr, dblock, *args, **kwargs): """peak-to-peak amplitude (stub)""" print("running ppa:") pp.pprint(kwargs) # print(dblock[kwargs['stream']]) return np.zeros(shape=(len(dblock),))
# ------------------------------------------------------------ # PyGarv.tr_docs CRUD # ------------------------------------------------------------ def _delete_tr_docs(self, dbp_idx, test_idx): """removes the test and its results from at tr_docs[dbp_idx] ['tests'][test_idx] and updates the tr_docs[dbp_idx]['fails'] tr_docs[dbp_idx]['pygarv'] """ tr_doc = self.tr_docs[dbp_idx] # lookup the tests/results doc n_tests = len(self.tr_docs[dbp_idx]["tests"]) self.tr_docs[dbp_idx]["tests"].pop(test_idx) self.tr_docs[dbp_idx]["fails"].pop(test_idx) # move the pygarv bits above the popped test one bit left mask = self.tr_docs[dbp_idx]["pygarv"].copy() for i in range(test_idx, n_tests - 1): mask -= (mask >> i & 1) << i # zero out the ith bit mask += ((mask >> (i + 1)) & 1) << i # copy i+1 bit to ith bit mask -= (mask >> (n_tests - 1) & 1) << n_tests - 1 # zero out the last bit self.tr_docs[dbp_idx]["pygarv"] = mask.copy() def _update_tr_docs(self, dbp_idx, test_idx, test): """Primary Create/Update pygarv test CRUD operation. The test is dry run on data_blocks[dbp_idx]. If an exception is raised, it is returned. If no exception is raised, the corresponding tr_docs[dbp_idx] test specs and results are modified and None is returned. Parameters ---------- dbp_idx: uint index of the ith datablock path in self.mkh5.data_blocks test_idx : uint index of the test in the self.tr_docs[dbp_idx]['tests']. If None, operation appends test to tr_docs[dbp_idx]['tests'] test: list PyGarvTest format list of singleton param:value dicts, e.g., [ {key:val}, ..., {key:val}] Returns ------- The first Exception raised when running the test or None on success. """ tr_doc = self.tr_docs[dbp_idx] # lookup the tests/results doc n_tests = len(self.tr_docs[dbp_idx]["tests"]) # check the test index this dblock if test_idx > n_tests: raise IndexError("test_idx > number of tests") # dry run the test and collect results try: result, fails = self._run_test(dbp_idx, test) except Exception as err: return err # if we make it here, mod the tr_docs with the new info if test_idx < n_tests: # update is overwrite in place self.tr_docs[dbp_idx]["tests"][test_idx] = test self.tr_docs[dbp_idx]["fails"][test_idx] = fails elif test_idx == n_tests: # update is append self.tr_docs[dbp_idx]["tests"].append(test) self.tr_docs[dbp_idx]["fails"].append(fails) else: # handled on the way in pass # update the pygarv vector self.tr_docs[dbp_idx]["pygarv"] = self._encode_pygarv_stream( test_idx, result, self.tr_docs[dbp_idx]["pygarv"] ) return None # ------------------------------------------------------------ # misc private utility methods # ------------------------------------------------------------ def _encode_pygarv_stream(self, test_idx, results, mask): """set only i-th bit of pygarv mask[j] = 1 where results[j] == True for test_idx == i Parameters ---------- test_idx : uint < 64 index in the list of tests, this dblock. Bit at this index encodes results results : np.ndarray, dtype=bool sample-wise results of a pygarv test ... 1 is bad/fail, 0 is good/pass mask : np.array, dtype='u4' array of 64-bit uints where the test_idx bit encodes test_idx results. Returns ------- mask : np.array, dtype='u4' This is the same mask bit-fiddled if at all, only at the test_idx-th bit. Usage ----- # pseudo-code for a given dblock and list of tests mask = np.zeros(shape=(len(dblock),), dtype='u4') for i,t in enumerate(tests): result = run_the_test(..., t, ...) # returns boolean len(dblock) mask = _encode_pygarv_stream(i, results, mask) # update the mask, bit i # now pygarv mask is current, this deblock The pygarv mask has the following properties: 1. Any non-zero value in the mask indicates some test failed at that sample (quick check) 2. From the numerical value of the mask at any given sample, unfiddling the non-zero values in the i-th bit-position recovers the i-th pygarv['tests'] in the list of tests in this dblock's header['pygarv'] info, i.e., points back to all there is to know about why this sample is marked bad ... stream, tag, threshold, other test params, etc.. 3. The index-to-bit-to-test mapping only holds per dblock since test specs can vary across dblocks. """ assert test_idx < 64 assert results.shape == mask.shape assert results.ndim == mask.ndim == 1 reset_vec = np.zeros(shape=(len(results),), dtype=results.dtype) if any(results): # zero the bits at test_idx, then reset w/ new results mask -= ((mask >> test_idx) & 1) << test_idx mask += ((2**test_idx) * results).astype(mask.dtype) return mask def _decode_pygarv_sample(self, s, tr_doc_tests): """decode the bits set in s and return the failed test specs Parameters ---------- s : uint64 as found in a pygarv_mask tr_doc_tests: list of PyGarv format tests specs Returns ------- failed tests : list of pygarv test specs those that tests which failed during run_tests() """ # idxs = [] failed_tests = [] ii = 0 # scan bits only as far as necessary while 2**ii <= s: # check last bit ... same as div mod 2 if (s >> np.uint(ii)) & np.uint(1): # idxs.append(ii) failed_tests.append(tr_doc_tests[ii]) ii += 1 return failed_tests def _decode_pygarv_stream(self, pygarv_stream, tr_doc, compress=True): """recover pygarv test indexes from bits in sample pygarv_stream This function operates per tr_doc, datablock pair The 64-bit uint at each sample of the pygarv data stream encodes which test or tests in the tr_doc['tests'] failed at that sample. This function decodes such uints, mapping the non-zero bits i,j,k, ... back to integers i, j, k ... that index the ith, jth, kth, test(s) in tr_doc that failed. Whew. Parameters ---------- pygarv_stream : np.ndarray, uint64, shape=(len(dblock), ) set bit i indicates failed test i in tr_doc['tests'] tr_doc : dict in yarf format ... {name: 'pygarv', 'dblock_path' : slashpath_to_mkh5_dblock, 'tests' : [ test_spec, ... test-spec ] 'fails' : [ [ {}, {}, ] ... [ {}, {}, ] ] ] } where each test_spec are the (ordered) param specs of a PyGarvTest. Returns ------- fails: list of list of uints Each list in `fails` is a lookup table of the samples in the datablock where the corresponding test in tr_doc['tests'] failed. I.e., * len(fails) == len(tr_doc['tests'] * fail[i] tracks tr_doc['tests'][i] * j in fail[i] == True iff test tr_doc['tests'][i] marks the jth row of the dblock as bad (boolean 1). (pygarv_stream[i] >> i) % 2 == True if i-th bit is set """ # may or may not be tests for this dblock ... if tr_doc["tests"] is None: return None # this trusts trust the tr_doc is well-formed test_fails = [[] for t in tr_doc["tests"]] # > 1 means at least one failed test fail_idxs = np.where(pygarv_stream > 0)[0] # decode the non-zero pygarv values if len(fail_idxs) > 0: # 1. unpack the test_fails for i, fail_idx in enumerate(fail_idxs): s = pygarv_stream[fail_idx] # integer value at the fail # test_idxs = [] ii = 0 # scan bits only as far as necessary while 2**ii <= s: # shift and check last bit ... same as div mod 2 if (s >> np.uint(ii)) & np.uint(1): # test_idxs.append(ii) # update failed samples, this test test_fails[ii].append(fail_idx) ii += 1 if compress == True: # RLL compress the fail regions as (x0,x1) intervals # two cases of interest: # length 1 (x0,x0) # length > 1 (x0, x1) where x0 < x1 for i, test_xs in enumerate(test_fails): # case 1: no points failed this test if len(test_xs) == 0: # no fails so no (x0,x1) tuples test_fails[i] = [] continue # case 2: one or more points failed this test x0_x1 = [] cnt = 0 n_xs = len(test_xs) x0 = test_xs[0] # init left edge, exists b.c len > 0 while cnt < n_xs: x1 = test_xs[cnt] # update right edge if cnt + 1 == n_xs: # last point is always a right bound, no new region x0_x1.append((x0, x1)) else: # look ahead ... if test_xs[cnt + 1] != test_xs[cnt] + 1: # append and start a new retion x0_x1.append((x0, x1)) x0 = test_xs[cnt + 1] cnt = cnt + 1 # overwrite this test sample fails w/ the intervals test_fails[i] = x0_x1 return test_fails # possibly [ [] ... [] ] if len == 0
# def _ppa(test,dblock,srate): # """ ppa test handler """ # # data type template for this test # dtypes = { # 'test': str, # 'tag': str, # 'stream': str, # 'params': dict( # threshold = float, # uV max excursion # interval = float # ms # ) # } # # general sanity check # _check_test(test, dtypes, dblock) # test_stream = dblock[test['stream']] # result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool) # result_stream.fill(False) # n = int((test['params']['interval']*srate)/1000.0) # # threshold = test['params']['threshold'] # # test snippets # idx = 0 # while idx < len(test_stream)-n: # test_run = test_stream[idx:idx+n] # idx_result = (test_stream[idx:idx+n].max() - # test_stream[idx:idx+n].min()) > threshold # if idx_result: # result_stream[idx:idx+n] = idx_result # # idx += n # skip rest of bads # # continue # idx += 1 # return(result_stream) # def _ppadif(test,dblock,srate): # """ ppadif test handler """ # # data type template for this test # dtypes = { # 'test': str, # 'tag': str, # 'stream': str, # 'params': dict( # stream_2 = str, # other data stream name # threshold = float, # uV max excursion # interval = float # ms # ) # } # # general sanity check # _check_test(test, dtypes, dblock) # test_stream = dblock[test['stream']] - \ # dblock[test['params']['stream_2']] # result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool) # result_stream.fill(False) # n = int((test['params']['interval']*srate)/1000.0) # # threshold = test['params']['threshold'] # # test snippets # idx = 0 # while idx < len(test_stream)-n: # idx_result = (test_stream[idx:idx+n].max() - # test_stream[idx:idx+n].min()) > threshold # if idx_result: # result_stream[idx:idx+n] = idx_result # # idx += n # skip rest of bads # # continue # idx += 1 # return(result_stream) # def _garv_dblock(hdr, dblock): # """ run tests given in hdr['pygarv']['tests'] on dblock data streams # Parameters # ---------- # hdr : dict in proper mkh5 dblock header format # dblock : np.ndarray in proper mkh5 dblock format # Returns # ------- # pygarv_stream : np.ndarray, shape = (1,len(dblock)), dtype=np.uint64 # non-zero values indicate artifacts, bit-code (2**i) indicates ith test failed # """ # if 'pygarv' not in hdr.keys(): # raise KeyError('pygarv not found ... add to YAML .yhdr') # else: # pg = hdr['pygarv'] # if 'tests' not in pg.keys(): # raise KeyError('tests not found in pygarv dict ... check .yhdr YAML doc name: pygarv') # if not isinstance(pg['tests'], list): # msg = "pygarv['tests'] is not a list ... check YAML .yhdr YAML doc name: pygarv" # raise TypeError(msg) # # check there is a pygarv method for each named test in the doc # for i,test in enumerate(pg['tests']): # if '_' + test['test'] not in pygarv.__dict__.keys(): # msg = ("unknown test name at pygarv['tests'][{0}]: " # "{1}").format(i, test) # raise ValueError(msg) # # check there is enough bit-width to code the number of tests # # pygarv_dt = np.dtype([('pygarv', 'uint64')]) # pygarv_dt = np.dtype('uint64') # n_bits = np.uint(pygarv_dt.itemsize * 8) # if len(pg['tests']) > n_bits: # msg = ('number of pygarv tests {0} ' # 'exceeds maximum {1}').format(len(pg['tests'],n_bits)) # raise ValueError(msg) # # setup to capture results # # n_bytes = int(np.ceil(len(pg['tests'])/8)) # pygarv_stream = np.zeros(shape=(len(dblock),), dtype=pygarv_dt) # # have at it # srate = hdr['samplerate'] # for i,test in enumerate(pg['tests']): # print('{0} {1} {2}'.format(test['test'], # test['stream'], # test['tag'])) # func_call = '_{0}(test,dblock,srate)'.format(test['test']) # # test_results.append( (test, eval(func_call)) ) # pygarv_stream = _encode_pygarv_stream(i, # eval(func_call), # pygarv_stream) # return(pygarv_stream) # def _mxflat(test, dblock, srate): # """ max flat test handler """ # # data type template for this test # dtypes = { # 'test': str, # 'tag': str, # 'stream': str, # 'params': dict( # threshold = float, # uV # interval = float )} # # general sanity check # _check_test(test, dtypes, dblock) # test_stream = dblock[test['stream']] # result_stream = np.ndarray(shape=(len(test_stream)), dtype=bool) # result_stream.fill(False) # # apply test across stream in rolling window # threshold = test['params']['threshold'] # n = int((test['params']['interval']*srate)/1000.0) # # idx = 0 # while idx < len(test_stream) - n: # idx_result = (test_stream[idx:idx+n].max() - # test_stream[idx:idx+n].min()) < threshold # if idx_result: # # mark the flat interval # result_stream[idx:idx+n] = idx_result # # idx += n # can't undo bad so fast forward # # continue # idx += 1 # return(result_stream) if __name__ == "__main__": import argparse # successor to optparse # set up parser parser = argparse.ArgumentParser(description="mkh5 artifact tagger") # names parser.add_argument("mkh5_f", type=str, help="mkh5 format data") parser.add_argument( "--yarf", type=str, metavar="myfile.yarf", dest="yarf_f", help=".yarf format YAML artifact test file", ) args_dict = vars(parser.parse_args()) # fetch from sys.argv # TO DO ... implement --tests option to dump available tests? if args_dict["yarf_f"] is None: # bare init w/ mkh5 for viewing print("pygarv viewer ...") pg = PyGarv(mkh5_f=args_dict["mkh5_f"]) pg._update_tr_docs_from_mkh5() mkh5viewer.launch_app(args_dict["mkh5_f"]) else: # run pygarv to mod the file print("pygarv marking artifacts ...") pg = PyGarv(mkh5_f=args_dict["mkh5_f"]) pg._update_tr_docs_from_yaml_f(yarf_f=args_dict["yarf_f"]) # load yarf tests pg._update_mkh5() # actually mod the h5 file