"""
Functions for removal of neuropil from calcium signals.
Authors:
- Sander W Keemink (swkeemink@scimail.eu)
- Scott C Lowe
Created:
2015-05-15
"""
import numpy as np
import numpy.random as rand
import scipy.signal as signal
from sklearn.decomposition import FastICA, NMF, PCA
[docs]def separate(
S, sep_method='nmf', n=None, maxiter=10000, tol=1e-4,
random_state=892, maxtries=10, W0=None, H0=None, alpha=0.1):
"""For the signals in S, finds the independent signals underlying it,
using ica or nmf.
Parameters
----------
S : array_like
2-d array containing mixed input signals.
Each column of `S` should be a different signal, and each row an
observation of the signals. For `S[i,j]`, `j` = each signal,
`i` = signal content.
The first column, `j = 0`, is considered the primary signal and the
one for which we will try to extract a decontaminated equivalent.
sep_method : {'ica','nmf'}
Which source separation method to use, either ICA or NMF.
- `'ica'`: Independent Component Analysis
- `'nmf'`: Non-negative Matrix Factorization
n : int, optional
How many components to estimate. If `None` (default), use PCA to
estimate how many components would explain at least 99% of the
variance and adopt this value for `n`.
maxiter : int, optional
Number of maximally allowed iterations. Default is 500.
tol : float, optional
Error tolerance for termination. Default is 1e-5.
random_state : int, optional
Initial state for the random number generator. Set to None to use
the numpy.random default. Default seed is 892.
maxtries : int, optional
Maximum number of tries before algorithm should terminate.
Default is 10.
W0 : array_like, optional
Optional starting condition for `W` in NMF algorithm.
(Ignored when using the ICA method.)
H0 : array_like, optional
Optional starting condition for `H` in NMF algorithm.
(Ignored when using the ICA method.)
alpha : float, optional
Sparsity regularizaton weight for NMF algorithm. Set to zero to
remove regularization. Default is 0.1.
(Ignored when using the ICA method.)
Returns
-------
numpy.ndarray
The raw separated traces.
numpy.ndarray
The separated traces matched to the primary signal, in order
of matching quality (see Methods below).
numpy.ndarray
Mixing matrix.
dict
Metadata for the convergence result, with keys:
- `'random_state'`: seed for ICA initiation
- `'iterations'`: number of iterations needed for convergence
- `'max_iterations'`: maximum number of iterations allowed
- `'converged'`: whether the algorithm converged or not (bool)
Notes
-----
Concept by Scott Lowe and Sander Keemink.
Normalize the columns in estimated mixing matrix A so that `sum(column)=1`.
This results in a relative score of how strongly each separated signal
is represented in each ROI signal.
"""
# TODO for edge cases, reduce the number of npil regions according to
# possible orientations
# TODO split into several functions. Maybe turn into a class.
# Ensure array_like input is a numpy.ndarray
S = np.asarray(S)
# normalize
median = np.median(S)
S /= median
# estimate number of signals to find, if not given
if n is None:
if sep_method == 'ica':
# Perform PCA
pca = PCA(whiten=False)
pca.fit(S.T)
# find number of components with at least x percent explained var
n = sum(pca.explained_variance_ratio_ > 0.01)
else:
n = S.shape[0]
if sep_method == 'ica':
# Use sklearn's implementation of ICA.
for ith_try in range(maxtries):
# Make an instance of the FastICA class. We can do whitening of
# the data now.
ica = FastICA(n_components=n, whiten=True, max_iter=maxiter,
tol=tol, random_state=random_state)
# Perform ICA and find separated signals
S_sep = ica.fit_transform(S.T)
# check if max number of iterations was reached
if ica.n_iter_ < maxiter:
print((
'ICA converged after {} iterations.'
).format(ica.n_iter_))
break
print((
'Attempt {} failed to converge at {} iterations.'
).format(ith_try + 1, ica.n_iter_))
if ith_try + 1 < maxtries:
print('Trying a new random state.')
# Change to a new random_state
random_state = rand.randint(8000)
if ica.n_iter_ == maxiter:
print((
'Warning: maximum number of allowed tries reached at {} '
'iterations for {} tries of different random seed states.'
).format(ica.n_iter_, ith_try + 1))
A_sep = ica.mixing_
elif sep_method == 'nmf':
for ith_try in range(maxtries):
# nSignals = nRegions +1
# ICA = FastICA(n_components=nSignals)
# ica = ICA.fit_transform(mixed.T) # Reconstruct signals
# A_ica = ICA.mixing_ # Get estimated mixing matrix
#
#
# Make an instance of the sklearn NMF class
if W0 is None and H0 is None:
nmf = NMF(
init='nndsvdar', n_components=n,
alpha=alpha, l1_ratio=0.5,
tol=tol, max_iter=maxiter, random_state=random_state)
# Perform NMF and find separated signals
S_sep = nmf.fit_transform(S.T)
else:
nmf = NMF(
init='custom', n_components=n,
alpha=alpha, l1_ratio=0.5,
tol=tol, max_iter=maxiter, random_state=random_state)
# Perform NMF and find separated signals
S_sep = nmf.fit_transform(S.T, W=W0, H=H0)
# check if max number of iterations was reached
if nmf.n_iter_ < maxiter - 1:
print((
'NMF converged after {} iterations.'
).format(nmf.n_iter_ + 1))
break
print((
'Attempt {} failed to converge at {} iterations.'
).format(ith_try, nmf.n_iter_ + 1))
if ith_try + 1 < maxtries:
print('Trying a new random state.')
# Change to a new random_state
random_state = rand.randint(8000)
if nmf.n_iter_ == maxiter - 1:
print((
'Warning: maximum number of allowed tries reached at {} '
'iterations for {} tries of different random seed states.'
).format(nmf.n_iter_ + 1, ith_try + 1))
A_sep = nmf.components_.T
else:
raise ValueError('Unknown separation method "{}".'.format(sep_method))
# make empty matched structure
S_matched = np.zeros(np.shape(S_sep))
# Normalize the columns in A so that sum(column)=1 (can be done in one line
# too).
# This results in a relative score of how strongly each separated signal
# is represented in each ROI signal.
A = abs(np.copy(A_sep))
for j in range(n):
if np.sum(A[:, j]) != 0:
A[:, j] /= np.sum(A[:, j])
# get the scores for the somatic signal
scores = A[0, :]
# get the order of scores
order = np.argsort(scores)[::-1]
# order the signals according to their scores
for j in range(n):
s_ = A_sep[0, order[j]] * S_sep[:, order[j]]
S_matched[:, j] = s_
# save the algorithm convergence info
convergence = {}
convergence['max_iterations'] = maxiter
if sep_method == 'ica':
convergence['random_state'] = random_state
convergence['iterations'] = ica.n_iter_
convergence['converged'] = not ica.n_iter_ == maxiter
elif sep_method == 'nmf':
convergence['random_state'] = random_state
convergence['iterations'] = nmf.n_iter_
convergence['converged'] = not nmf.n_iter_ == maxiter
# scale back to raw magnitudes
S_matched *= median
S *= median
return S_sep.T, S_matched.T, A_sep, convergence
[docs]def lowPassFilter(F, fs=40, nfilt=40, fw_base=10, axis=0):
'''Low pass filters a fluorescence imaging trace line.
Parameters
----------
F : array_like
Fluorescence signal.
fs : float, optional
Sampling frequency of F, in Hz. Default is 40.
nfilt : int, optional
Number of taps to use in FIR filter. Default is 40.
fw_base : float, optional
Cut-off frequency for lowpass filter, in Hz. Default is 10.
axis : int, optional
Along which axis to apply low pass filtering. Default is 0.
Returns
-------
numpy.ndarray
Low pass filtered signal with the same shape as `F`.
'''
# The Nyquist rate of the signal is half the sampling frequency
nyq_rate = fs / 2.0
# Make a set of weights to use with our taps.
# We use an FIR filter with a Hamming window.
b = signal.firwin(nfilt, cutoff=fw_base / nyq_rate, window='hamming')
# Use lfilter to filter with the FIR filter.
# We filter along the second dimension because that represents time
filtered_f = signal.filtfilt(b, [1.0], F, axis=axis)
return filtered_f