# -*- coding: utf-8 -*-
"""Developer utilities for data validation and managing parallel CPU
processing. Not meant for general use.
"""
import numpy as np
from collections import defaultdict, OrderedDict
import subprocess
import re
import ctypes
import sys
import os
import glob
import warnings
MKL = 'mkl'
OBLAS = 'openblas'
CBLAS = 'cblas' # numpy.show_config doesn't know if this is mkl or openblas
[docs]def get_index_duplicates_table(df, level):
"""Return a string table of duplicate index values and their locations."""
assert level in df.index.names
level_values = df.index.get_level_values(level)
dupe_mask = level_values.duplicated(keep=False)
dupes = level_values[dupe_mask]
dupe_indices = np.flatnonzero(dupe_mask)
dupe_dict = defaultdict(list)
for index, value in zip(dupe_indices, dupes):
dupe_dict[value].append(index)
# hardcoded padding
msg = f'Duplicates in index level {level}:\n\n'
msg += '{0:<20} {1}\n'.format(level, 'Locations')
for value, locations in sorted(dupe_dict.items()):
locations = (str(item) for item in locations)
msg += '{0:<20} {1}\n'.format(value, ', '.join(locations))
return msg
[docs]def get_first_group(groupby):
first_group_name = list(groupby.groups)[0]
first_group = groupby.get_group(first_group_name)
return first_group
[docs]def deduplicate_list(lst):
return list(OrderedDict.fromkeys(lst))
[docs]class BLAS:
"""BLAS wrapper as determined by its thread getter/setter"""
def __init__(self, cdll):
self.cdll = cdll
self.kind = None
# quack like an mkl or openblas duck or fail
try:
self.get_n_threads = cdll.MKL_Get_Max_Threads
self.set_n_threads = cdll.MKL_Set_Num_Threads
self.kind = MKL
except Exception:
pass
try:
self.get_n_threads = cdll.openblas_get_num_threads
self.set_n_threads = cdll.openblas_set_num_threads
self.kind = OBLAS
except Exception:
pass
if self.kind not in (MKL, OBLAS):
raise NotImplementedError(
f"BLAS must be {MKL} or {OBLAS} in {str(cdll)}"
)
def __repr__(self):
if self.kind == MKL:
kind = 'MKL'
if self.kind == OBLAS:
kind = 'OpenBLAS'
n_threads = self.get_n_threads()
return f'{kind} @ {n_threads} threads'
[docs]def get_blas_osys(numpy_module, osys):
NUMPY_PATH = os.path.join(numpy_module.__path__[0], 'core')
MULTIARRAY_PATH = glob.glob(
os.path.join(NUMPY_PATH, '_multiarray_umath*.so')
)[0]
if osys == 'linux':
COMMAND = 'ldd'
LDD_ARGS = [COMMAND, MULTIARRAY_PATH]
PATTERN = r'^\t.*{}.* => (?P<path>.*) \(0x.*$'
elif osys == 'darwin':
COMMAND = 'otool'
FLAGS = '-L'
LDD_ARGS = [COMMAND, FLAGS, MULTIARRAY_PATH]
# PATTERN = r'^\t@loader_path/(?P<path>.*{}.*) \(.*\)$'
# MacOS 10.13.6 otools shows @rpath not @loader_path
# for the conda installed mkl.
PATTERN = r'^\t@.*path/(?P<path>.*{}.*) \(.*\)$'
else:
raise ValueError(f'get_blas_osys() does not support osys={osys}')
ldd_result = subprocess.run(
args=LDD_ARGS,
check=True,
stdout=subprocess.PIPE,
universal_newlines=True,
)
output = ldd_result.stdout
kinds = [MKL, OBLAS, CBLAS]
for kind in kinds:
match = re.search(PATTERN.format(kind), output, flags=re.MULTILINE)
if match:
path = match.groupdict()['path']
cdll = ctypes.CDLL(path)
return BLAS(cdll)
# unknown kind
return None
[docs]def get_blas(numpy_module):
"""Return BLAS object or None if neither MKL nor OpenBLAS is found."""
if sys.platform.startswith('linux'):
# return get_blas_linux(numpy_module)
return get_blas_osys(numpy_module, 'linux')
elif sys.platform == 'darwin':
# return get_blas_mac(numpy_module)
return get_blas_osys(numpy_module, 'darwin')
warnings.warn(
f'Searching for BLAS libraries on {sys.platform} is not supported.'
)
[docs]class single_threaded:
def __init__(self, numpy_module):
self.blas = get_blas(numpy_module)
def __enter__(self):
if self.blas is not None:
self.old_n_threads = self.blas.get_n_threads()
self.blas.set_n_threads(1)
else:
warnings.warn(
'No MKL/OpenBLAS found, assuming NumPy is single-threaded.'
)
def __exit__(self, *args):
if self.blas is not None:
self.blas.set_n_threads(self.old_n_threads)
if self.blas.get_n_threads() != self.old_n_threads:
message = (
f'Failed to reset {self.blas.kind} '
f'to {self.old_n_threads} threads (previous value).'
)
raise RuntimeError(message)
[docs]def design_matrix_is_constant(df, columns, time):
"""Check that values in columns of df do not change within any epoch.
See Notes for more details.
Parameters
----------
df : pandas.DataFrame
dataframe to check
columns : list of str
list of column names to be checked
time : str
name of the time column
Returns
-------
result : bool
True if values in specified columns don't change, False otherwise
Notes
-----
We check that from timepoint to timepoint, each epoch has the same value in
a given column:
.. table:: epoch1
:widths: auto
=== ===
A B
=== ===
1 x
1 x
1 x
1 x
1 x
=== ===
.. table:: epoch2
:widths: auto
=== ===
A B
=== ===
2 y
2 y
2 y
2 y
2 y
=== ===
This is helpful when performing linear regression on an epochs table where
the predictors vary with epochs (as they are expected to) but stay constant
from sample to sample, because we can do our modeling much faster.
"""
gb = df.groupby(time)
_, group = next(iter(gb)) # first group
df_columns_values = df[columns].values
group_columns_values = group[columns].values
expected_df_values = np.repeat(group_columns_values, len(gb), axis=0)
return (df_columns_values == expected_df_values).all()