Source code for fitgrid.tools

import numpy as np
from collections import defaultdict, OrderedDict
import subprocess
import re
import ctypes
import sys
import os
import glob
import warnings

MKL = 'mkl'
BLAS = 'blas'  # matches libopenblas, libcblas


[docs]def get_index_duplicates_table(df, level): """Return a string table of duplicate index values and their locations.""" assert level in df.index.names level_values = df.index.get_level_values(level) dupe_mask = level_values.duplicated(keep=False) dupes = level_values[dupe_mask] dupe_indices = np.flatnonzero(dupe_mask) dupe_dict = defaultdict(list) for index, value in zip(dupe_indices, dupes): dupe_dict[value].append(index) # hardcoded padding msg = f'Duplicates in index level {level}:\n\n' msg += '{0:<20} {1}\n'.format(level, 'Locations') for value, locations in sorted(dupe_dict.items()): locations = (str(item) for item in locations) msg += '{0:<20} {1}\n'.format(value, ', '.join(locations)) return msg
[docs]def get_first_group(groupby): first_group_name = list(groupby.groups)[0] first_group = groupby.get_group(first_group_name) return first_group
[docs]def deduplicate_list(lst): return list(OrderedDict.fromkeys(lst))
[docs]class BLAS: def __init__(self, cdll, kind): if kind not in (MKL, BLAS): raise ValueError( f'kind must be {MKL} or {BLAS}, got {kind} instead.' ) self.kind = kind self.cdll = cdll if kind == MKL: self.get_n_threads = cdll.MKL_Get_Max_Threads self.set_n_threads = cdll.MKL_Set_Num_Threads else: self.get_n_threads = cdll.openblas_get_num_threads self.set_n_threads = cdll.openblas_set_num_threads def __repr__(self): if self.kind == MKL: kind = 'MKL' if self.kind == BLAS: kind = 'OpenBLAS' n_threads = self.get_n_threads() return f'{kind} @ {n_threads} threads'
[docs]def get_blas_osys(numpy_module, osys): NUMPY_PATH = os.path.join(numpy_module.__path__[0], 'core') MULTIARRAY_PATH = glob.glob( os.path.join(NUMPY_PATH, '_multiarray_umath*.so') )[0] if osys == 'linux': COMMAND = 'ldd' LDD_ARGS = [COMMAND, MULTIARRAY_PATH] PATTERN = r'^\t.*{}.* => (?P<path>.*) \(0x.*$' elif osys == 'darwin': COMMAND = 'otool' FLAGS = '-L' LDD_ARGS = [COMMAND, FLAGS, MULTIARRAY_PATH] # PATTERN = r'^\t@loader_path/(?P<path>.*{}.*) \(.*\)$' # MacOS 10.13.6 otools shows @rpath not @loader_path # for the conda installed mkl. PATTERN = r'^\t@.*path/(?P<path>.*{}.*) \(.*\)$' else: raise ValueError(f'get_blas_osys() does not support osys={osys}') ldd_result = subprocess.run( args=LDD_ARGS, check=True, stdout=subprocess.PIPE, universal_newlines=True, ) output = ldd_result.stdout kinds = [MKL, BLAS] for kind in kinds: match = re.search(PATTERN.format(kind), output, flags=re.MULTILINE) if match: path = match.groupdict()['path'] cdll = ctypes.CDLL(path) return BLAS(cdll, kind) # unknown kind return None
[docs]def get_blas(numpy_module): """Return BLAS object or None if neither MKL nor OpenBLAS is found.""" if sys.platform.startswith('linux'): # return get_blas_linux(numpy_module) return get_blas_osys(numpy_module, 'linux') elif sys.platform == 'darwin': # return get_blas_mac(numpy_module) return get_blas_osys(numpy_module, 'darwin') warnings.warn( f'Searching for BLAS libraries on {sys.platform} is not supported.' )
[docs]class single_threaded: def __init__(self, numpy_module): self.blas = get_blas(numpy_module) def __enter__(self): if self.blas is not None: self.old_n_threads = self.blas.get_n_threads() self.blas.set_n_threads(1) else: warnings.warn( 'No MKL/OpenBLAS found, assuming NumPy is single-threaded.' ) def __exit__(self, *args): if self.blas is not None: self.blas.set_n_threads(self.old_n_threads) if self.blas.get_n_threads() != self.old_n_threads: message = ( f'Failed to reset {self.blas.kind} ' f'to {self.old_n_threads} threads (previous value).' ) raise RuntimeError(message)
[docs]def design_matrix_is_constant(df, columns, time): """Check that values in columns of df do not change within any epoch. See Notes for more details. Parameters ---------- df : pandas.DataFrame dataframe to check columns : list of str list of column names to be checked time : str name of the time column Returns ------- result : bool True if values in specified columns don't change, False otherwise Notes ----- We check that from timepoint to timepoint, each epoch has the same value in a given column: .. table:: epoch1 :widths: auto === === A B === === 1 x 1 x 1 x 1 x 1 x === === .. table:: epoch2 :widths: auto === === A B === === 2 y 2 y 2 y 2 y 2 y === === This is helpful when performing linear regression on an epochs table where the predictors vary with epochs (as they are expected to) but stay constant from sample to sample, because we can do our modeling much faster. """ gb = df.groupby(time) _, group = next(iter(gb)) # first group df_columns_values = df[columns].values group_columns_values = group[columns].values expected_df_values = np.repeat(group_columns_values, len(gb), axis=0) return (df_columns_values == expected_df_values).all()