Source code for fissa.neuropil

"""
Functions for removal of neuropil from calcium signals.

Authors:
    - Sander W Keemink <swkeemink@scimail.eu>
    - Scott C Lowe <scott.code.lowe@gmail.com>
Created:
    2015-05-15
"""

import warnings

import numpy as np
import numpy.random as rand
import sklearn.decomposition


[docs]def separate( S, sep_method="nmf", n=None, max_iter=10000, tol=1e-4, random_state=892, max_tries=10, W0=None, H0=None, alpha=0.1, verbosity=1, prefix="", ): """ Find independent signals, sorted by matching score against the first input signal. Parameters ---------- S : :term:`array_like` shaped (signals, observations) 2-d array containing mixed input signals. Each column of `S` should be a different signal, and each row an observation of the signals. For ``S[i, j]``, ``j`` is a signal, and ``i`` is an observation. The first column, ``j = 0``, is considered the primary signal and the one for which we will try to extract a decontaminated equivalent. sep_method : {"ica", "nmf"} Which source separation method to use, either ICA or NMF. - ``"ica"``: Independent Component Analysis - ``"nmf"``: Non-negative Matrix Factorization n : int, optional How many components to estimate. If ``None`` (default), for the NMF method, ``n`` is the number of input signals; for the ICA method, we use PCA to estimate how many components would explain at least 99% of the variance and adopt this value for ``n``. max_iter : int, default=10000 Maximum number of iterations before timing out on an attempt. .. versionchanged:: 1.0.0 Argument `maxiter` renamed to `max_iter`. tol : float, default=1e-4 Tolerance of the stopping condition. random_state : int or None, default=892 Initial state for the random number generator. Set to ``None`` to use the :mod:`numpy.random` default. Default seed is ``892``. max_tries : int, default=10 Maximum number of random initial states to try. Each random state will be optimized for `max_iter` iterations before timing out. .. versionchanged:: 1.0.0 Argument `maxtries` renamed to `max_tries`. W0 : :term:`array_like`, optional Optional starting condition for ``W`` in NMF algorithm. (Ignored when using the ICA method.) H0 : :term:`array_like`, optional Optional starting condition for ``H`` in NMF algorithm. (Ignored when using the ICA method.) alpha : float, default=0.1 Sparsity regularizaton weight for NMF algorithm. Set to zero to remove regularization. Default is ``0.1``. (Ignored when using the ICA method.) verbosity : int, default=1 Level of verbosity. The options are: - ``0``: No outputs. - ``1``: Print separation progress. prefix : str, optional String to include before any progress statements. Returns ------- S_sep : :class:`numpy.ndarray` shaped (signals, observations) The raw separated traces. S_matched : :class:`numpy.ndarray` shaped (signals, observations) The separated traces matched to the primary signal, in order of matching quality (see Notes below). A_sep : :class:`numpy.ndarray` shaped (signals, signals) Mixing matrix. convergence : dict Metadata for the convergence result, with the following keys and values: ``convergence["random_state"]`` Seed for estimator initiation. ``convergence["iterations"]`` Number of iterations needed for convergence. ``convergence["max_iterations"]`` Maximum number of iterations allowed. ``convergence["converged"]`` Whether the algorithm converged or not (:class:`bool`). Notes ----- To identify which independent signal matches the primary signal best, we first normalize the columns in the output mixing matrix `A` such that ``sum(A[:, separated]) = 1``. This results in a relative score of how strongly each raw signal contributes to each separated signal. From this, we find the separated signal to which the ROI trace makes the largest (relative) contribution. See Also -------- sklearn.decomposition.NMF, sklearn.decomposition.FastICA """ # TODO for edge cases, reduce the number of npil regions according to # possible orientations # TODO split into several functions. Maybe turn into a class. # Include a space as a separator between prefix and output. if prefix and prefix[-1] != " ": prefix += " " # Ensure array_like input is a numpy.ndarray S = np.asarray(S) # normalize median = np.median(S) S /= median # estimate number of signals to find, if not given if n is None: if sep_method.lower() == "ica": # Perform PCA pca = sklearn.decomposition.PCA(whiten=False) pca.fit(S.T) # find number of components with at least x percent explained var n = sum(pca.explained_variance_ratio_ > 0.01) else: n = S.shape[0] for i_try in range(max_tries): if sep_method.lower() in {"ica", "fastica"}: # Use sklearn's implementation of ICA. # Make an instance of the FastICA class. We can do whitening of # the data now. estimator = sklearn.decomposition.FastICA( n_components=n, whiten=True, max_iter=max_iter, tol=tol, random_state=random_state, ) # Perform ICA and find separated signals S_sep = estimator.fit_transform(S.T) elif sep_method.lower() in {"nmf", "nnmf"}: # Make an instance of the sklearn NMF class with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message=".*`alpha` was deprecated in.*", category=DeprecationWarning, ) warnings.filterwarnings( "ignore", message=".*`alpha` was deprecated in.*", category=FutureWarning, ) estimator = sklearn.decomposition.NMF( init="nndsvdar" if W0 is None and H0 is None else "custom", n_components=n, alpha=alpha, l1_ratio=0.5, tol=tol, max_iter=max_iter, random_state=random_state, ) # Perform NMF and find separated signals S_sep = estimator.fit_transform(S.T, W=W0, H=H0) elif hasattr(sklearn.decomposition, sep_method): if verbosity >= 1: print( "{}Using ad hoc signal decomposition method" " sklearn.decomposition.{}. Only NMF and ICA are officially" " supported.".format(prefix, sep_method) ) # Load up arbitrary decomposition algorithm from sklearn estimator = getattr(sklearn.decomposition, sep_method)( n_components=n, tol=tol, max_iter=max_iter, random_state=random_state, ) S_sep = estimator.fit_transform(S.T) else: raise ValueError('Unknown separation method "{}".'.format(sep_method)) # check if max number of iterations was reached if estimator.n_iter_ < max_iter: if verbosity >= 1: print( "{}{} converged after {} iterations.".format( prefix, repr(estimator).split("(")[0], estimator.n_iter_ ) ) break if verbosity >= 1: print( "{}Attempt {} failed to converge at {} iterations.".format( prefix, i_try + 1, estimator.n_iter_ ) ) if i_try + 1 < max_tries: if verbosity >= 1: print("{}Trying a new random state.".format(prefix)) # Change to a new random_state if random_state is not None: random_state = (random_state + 1) % 2 ** 32 if estimator.n_iter_ == max_iter: if verbosity >= 1: print( "{}Warning: maximum number of allowed tries reached at {} iterations" " for {} tries of different random seed states.".format( prefix, estimator.n_iter_, i_try + 1 ) ) if hasattr(estimator, "mixing_"): A_sep = estimator.mixing_ else: A_sep = estimator.components_.T # Normalize the columns in A so that sum(column)=1 (can be done in one line # too). # This results in a relative score of how strongly each separated signal # is represented in each ROI signal. # # Our mixing matrix is shaped (input/raw, output/separated). For each # separated (output) signal, we find how much weighting each input (raw) # signal contributes to that separated signal, relative to the other input # signals. A = abs(np.copy(A_sep)) for j in range(n): if np.sum(A[:, j]) != 0: A[:, j] /= np.sum(A[:, j]) # get the scores for the somatic signal scores = A[0, :] # Rank the separated signals in descending ordering of their score. # The separated signal to which the somatic signal makes up the largest # contribution is sorted first. order = np.argsort(scores)[::-1] # Order the signals according to their scores, and scale the magnitude # back to the original magnitude. S_matched = np.zeros_like(S_sep) for j in range(n): S_matched[:, j] = A_sep[0, order[j]] * S_sep[:, order[j]] # save the algorithm convergence info convergence = {} convergence["max_iterations"] = max_iter convergence["random_state"] = random_state convergence["iterations"] = estimator.n_iter_ convergence["converged"] = estimator.n_iter_ != max_iter # scale back to raw magnitudes S_matched *= median S *= median return S_sep.T, S_matched.T, A_sep, convergence