Source code for fitgrid.tools

# -*- coding: utf-8 -*-
"""Developer utilities for data validation and managing parallel CPU
processing. Not meant for general use.
"""

import numpy as np
from collections import defaultdict, OrderedDict
import subprocess
import re
import ctypes
import sys
import os
import glob
import warnings

MKL = 'mkl'
OBLAS = 'openblas'
CBLAS = 'cblas'  # numpy.show_config doesn't know if this is mkl or openblas


[docs]def get_index_duplicates_table(df, level): """Return a string table of duplicate index values and their locations.""" assert level in df.index.names level_values = df.index.get_level_values(level) dupe_mask = level_values.duplicated(keep=False) dupes = level_values[dupe_mask] dupe_indices = np.flatnonzero(dupe_mask) dupe_dict = defaultdict(list) for index, value in zip(dupe_indices, dupes): dupe_dict[value].append(index) # hardcoded padding msg = f'Duplicates in index level {level}:\n\n' msg += '{0:<20} {1}\n'.format(level, 'Locations') for value, locations in sorted(dupe_dict.items()): locations = (str(item) for item in locations) msg += '{0:<20} {1}\n'.format(value, ', '.join(locations)) return msg
[docs]def get_first_group(groupby): first_group_name = list(groupby.groups)[0] first_group = groupby.get_group(first_group_name) return first_group
[docs]def deduplicate_list(lst): return list(OrderedDict.fromkeys(lst))
[docs]class BLAS: """BLAS wrapper as determined by its thread getter/setter""" def __init__(self, cdll): self.cdll = cdll self.kind = None # quack like an mkl or openblas duck or fail try: self.get_n_threads = cdll.MKL_Get_Max_Threads self.set_n_threads = cdll.MKL_Set_Num_Threads self.kind = MKL except Exception: pass try: self.get_n_threads = cdll.openblas_get_num_threads self.set_n_threads = cdll.openblas_set_num_threads self.kind = OBLAS except Exception: pass if self.kind not in (MKL, OBLAS): raise NotImplementedError( f"BLAS must be {MKL} or {OBLAS} in {str(cdll)}" ) def __repr__(self): if self.kind == MKL: kind = 'MKL' if self.kind == OBLAS: kind = 'OpenBLAS' n_threads = self.get_n_threads() return f'{kind} @ {n_threads} threads'
[docs]def get_blas_osys(numpy_module, osys): NUMPY_PATH = os.path.join(numpy_module.__path__[0], 'core') MULTIARRAY_PATH = glob.glob( os.path.join(NUMPY_PATH, '_multiarray_umath*.so') )[0] if osys == 'linux': COMMAND = 'ldd' LDD_ARGS = [COMMAND, MULTIARRAY_PATH] PATTERN = r'^\t.*{}.* => (?P<path>.*) \(0x.*$' elif osys == 'darwin': COMMAND = 'otool' FLAGS = '-L' LDD_ARGS = [COMMAND, FLAGS, MULTIARRAY_PATH] # PATTERN = r'^\t@loader_path/(?P<path>.*{}.*) \(.*\)$' # MacOS 10.13.6 otools shows @rpath not @loader_path # for the conda installed mkl. PATTERN = r'^\t@.*path/(?P<path>.*{}.*) \(.*\)$' else: raise ValueError(f'get_blas_osys() does not support osys={osys}') ldd_result = subprocess.run( args=LDD_ARGS, check=True, stdout=subprocess.PIPE, universal_newlines=True, ) output = ldd_result.stdout kinds = [MKL, OBLAS, CBLAS] for kind in kinds: match = re.search(PATTERN.format(kind), output, flags=re.MULTILINE) if match: path = match.groupdict()['path'] cdll = ctypes.CDLL(path) return BLAS(cdll) # unknown kind return None
[docs]def get_blas(numpy_module): """Return BLAS object or None if neither MKL nor OpenBLAS is found.""" if sys.platform.startswith('linux'): # return get_blas_linux(numpy_module) return get_blas_osys(numpy_module, 'linux') elif sys.platform == 'darwin': # return get_blas_mac(numpy_module) return get_blas_osys(numpy_module, 'darwin') warnings.warn( f'Searching for BLAS libraries on {sys.platform} is not supported.' )
[docs]class single_threaded: def __init__(self, numpy_module): self.blas = get_blas(numpy_module) def __enter__(self): if self.blas is not None: self.old_n_threads = self.blas.get_n_threads() self.blas.set_n_threads(1) else: warnings.warn( 'No MKL/OpenBLAS found, assuming NumPy is single-threaded.' ) def __exit__(self, *args): if self.blas is not None: self.blas.set_n_threads(self.old_n_threads) if self.blas.get_n_threads() != self.old_n_threads: message = ( f'Failed to reset {self.blas.kind} ' f'to {self.old_n_threads} threads (previous value).' ) raise RuntimeError(message)
[docs]def design_matrix_is_constant(df, columns, time): """Check that values in columns of df do not change within any epoch. See Notes for more details. Parameters ---------- df : pandas.DataFrame dataframe to check columns : list of str list of column names to be checked time : str name of the time column Returns ------- result : bool True if values in specified columns don't change, False otherwise Notes ----- We check that from timepoint to timepoint, each epoch has the same value in a given column: .. table:: epoch1 :widths: auto === === A B === === 1 x 1 x 1 x 1 x 1 x === === .. table:: epoch2 :widths: auto === === A B === === 2 y 2 y 2 y 2 y 2 y === === This is helpful when performing linear regression on an epochs table where the predictors vary with epochs (as they are expected to) but stay constant from sample to sample, because we can do our modeling much faster. """ gb = df.groupby(time) _, group = next(iter(gb)) # first group df_columns_values = df[columns].values group_columns_values = group[columns].values expected_df_values = np.repeat(group_columns_values, len(gb), axis=0) return (df_columns_values == expected_df_values).all()