Source code for smm

"""t-Student Mixture Models Module (smm).

- This module allows you to model data by a mixture of t-Student \
distributions, estimating the parameters with \
Expectation-Maximisation. \
It is an implementation of the paper: 'Robust mixture modelling using \
the t distribution', D. Peel and G. J. McLachlan. Published at: \
Statistics and Computing (2000) 10, 339-348.   

- This module has reused code and comments from sklearn.mixture.gmm.  
"""

import numpy as np
import sklearn
import sklearn.cluster
import sklearn.utils
import scipy.linalg
import scipy.special
import scipy.optimize
import warnings


[docs]class dofMaximizationError(ValueError): def __init__(self, message): super(dofMaximizationError, self).__init__(message)
[docs]class SMM(sklearn.base.BaseEstimator): """t-Student Mixture Model SMM class. Representation of a t-Student mixture model probability distribution. This class allows for easy evaluation of, sampling from, and maximum-likelihood estimation of the parameters of an SMM distribution. Initializes parameters such that every mixture component has zero mean and identity covariance. Parameters ---------- n_components : int, optional. Number of mixture components. Defaults to 1. covariance_type : string, optional. String describing the type of covariance parameters to use. Must be one of 'spherical', 'tied', 'diag', 'full'. Defaults to 'full'. random_state: RandomState or an int seed. A random number generator instance. None by default. tol : float, optional. Convergence threshold. EM iterations will stop when average gain in log-likelihood is below this threshold. Defaults to 1e-6. min_covar : float, optional. Floor on the diagonal of the covariance matrix to prevent overfitting. Defaults to 1e-6. n_iter : int, optional. Number of EM iterations to perform. Defaults to 1000. n_init : int, optional. Number of initializations to perform. The best result is kept. Defaults to 1. params : string, optional. Controls which parameters are updated in the training process. Can contain any combination of 'w' for weights, 'm' for means, 'c' for covars, and 'd' for the degrees of freedom. Defaults to 'wmcd'. init_params : string, optional. Controls which parameters are updated in the initialization process. Can contain any combination of 'w' for weights, 'm' for means, 'c' for covars, and 'd' for the degrees of freedom. Defaults to 'wmcd'. Attributes ---------- weights_ : array, shape (`n_components`,). This attribute stores the mixing weights for each mixture component. means_ : array_like, shape (`n_components`, `n_features`). Mean parameters for each mixture component. covars_ : array_like. Covariance parameters for each mixture component. The shape depends on `covariance_type`: (n_components, n_features) if 'spherical', (n_features, n_features) if 'tied', (n_components, n_features) if 'diag', (n_components, n_features, n_features) if 'full' converged_ : bool. True when convergence was reached in fit(), False otherwise. """ def __init__(self, n_components=1, covariance_type='full', random_state=None, tol=1e-6, min_covar=1e-6, n_iter=1000, n_init=1, params='wmcd', init_params='wmcd'): # Store the parameters as class attributes self.n_components = n_components self.covariance_type = covariance_type self.random_state = random_state self.tol = tol self.min_covar = min_covar self.n_iter = n_iter self.n_init = n_init self.params = params self.init_params = init_params #self.converged_ = False def _expectation_step(self, X): """Performs the expectation step of the EM algorithm. This method uses the means, class-related weights, covariances and degrees of freedom stored in the attributes of this class: self.means_, self.weights_, self.covars_, and self.degrees_. Parameters ---------- X : array_like, shape (n_samples, n_features). Matrix with all the data points, each row represents a new data point and the columns represent each one of the features. """ # Sanity checks: # - Check that the fit() method has been called before this # one. # - Convert input to 2d array, raise error on sparse # matrices. Calls assert_all_finite by default. # - Check that the the X array is not empty of samples. # - Check that the no. of features is equivalent to the no. # of means that we have in self. sklearn.utils.validation.check_is_fitted(self, 'means_') X = sklearn.utils.validation.check_array(X, dtype=np.float64) if X.ndim == 1: X = X[:, np.newaxis] if X.size == 0: return np.array([]), np.empty((0, self.n_components)) if X.shape[1] != self.means_.shape[1]: raise ValueError( '[SMM._expectation_step] Error, the ' \ + 'shape of X is not compatible with self.' ) # Initialisation of reponsibilities and weight of each point for # the Gamma distribution n_samples, n_dim = X.shape responsibilities = np.ndarray( shape=(X.shape[0], self.n_components), dtype=np.float64 ) gammaweights_ = np.ndarray( shape=(X.shape[0], self.n_components), dtype=np.float64 ) # Calculate the probability of each point belonging to each # t-Student distribution of the mixture pr_before_weighting = self._multivariate_t_student_density( X, self.means_, self.covars_, self.degrees_, self.covariance_type, self.min_covar ) pr = pr_before_weighting * self.weights_ # Calculate the likelihood of each point likelihoods = pr.sum(axis=1) # Update responsibilities responsibilities = \ pr / (likelihoods.reshape(likelihoods.shape[0], 1) + 10 * SMM._EPS ) # Update the Gamma weight for each observation mahalanobis_distance_mix_func = SMM._mahalanobis_funcs[ self.covariance_type ] vp = self.degrees_ + n_dim maha_dist = mahalanobis_distance_mix_func( X, self.means_, self.covars_, self.min_covar ) gammaweights_ = vp / (self.degrees_ + maha_dist) return likelihoods, responsibilities, gammaweights_ def _maximisation_step(self, X, responsibilities, gammaweights_): """Perform the maximisation step of the EM algorithm. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. responsibilities : array_like, shape (n_samples, n_components). gammaweights_ : array_like, shape (n_samples, n_components). """ n_samples, n_dim = X.shape z_sum = responsibilities.sum(axis=0) zu = responsibilities * gammaweights_ zu_sum = zu.sum(axis=0) # Update weights if 'w' in self.params: self.weights_ = z_sum / n_samples # Update means if 'm' in self.params: dot_zu_x = np.dot(zu.T, X) zu_sum_ndarray = zu_sum.reshape(zu_sum.shape[0], 1) self.means_ = dot_zu_x / (zu_sum_ndarray + 10 * SMM._EPS) # Update covariances if 'c' in self.params: covar_mstep_func = SMM._covar_mstep_funcs[ self.covariance_type ] self.covars_ = covar_mstep_func( X, zu, z_sum, self.means_, self.min_covar ) # Update degrees of freedom if 'd' in self.params: try: self.degrees_ = SMM._solve_dof_equation( self.degrees_, responsibilities, z_sum, gammaweights_, n_dim, self.tol, self.n_iter ) except FloatingPointError as fpe: message = str(fpe) raise dofMaximizationError(message) except RuntimeError as re: message = str(re) if message.startswith('Failed to converge after'): warnings.warn(message, RuntimeWarning) pass
[docs] def fit(self, X, y=None): """Estimate model parameters with the EM algorithm. A initialization step is performed before entering the em algorithm. If you want to avoid this step, set the keyword argument init_params to the empty string '' when creating the SMM object. Likewise, if you would like just to do an initialization, set n_iter=0. Parameters ---------- X : array_like, shape (n_samples, n_features). y : not used, just for compatibility with sklearn API. """ # Sanity check: assert that the input matrix is not 1D if (len(X.shape) == 1): raise ValueError( '[SMM.fit] Error, the input matrix must have a ' \ + 'shape of (n_samples, n_features).' ) # Sanity checks: # - Convert input to 2d array, raise error on sparse # matrices. Calls assert_all_finite by default. # - No. of samples is higher or equal to the no. of # components in the mixture. X = sklearn.utils.validation.check_array(X, dtype=np.float64) if X.shape[0] < self.n_components: raise ValueError( '[SMM.fit] Error, SMM estimation with ' \ + '%s components, but got only %s samples' % ( self.n_components, X.shape[0] ) ) # For all the initialisations we get the one with the best # parameters n_samples, n_dim = X.shape max_prob = -np.infty for _ in range(self.n_init): # EM initialisation if 'm' in self.init_params or not hasattr(self, 'means_'): kmeans = sklearn.cluster.KMeans( n_clusters=self.n_components, init='k-means++', random_state=self.random_state ) self.means_ = kmeans.fit(X).cluster_centers_ if 'w' in self.init_params or not hasattr(self, 'weights_'): self.weights_ = np.tile( 1.0 / self.n_components, self.n_components ) if 'c' in self.init_params or not hasattr(self, 'covars_'): cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1]) if not cv.shape: cv.shape = (1, 1) self.covars_ = SMM.dist_covar_to_match_cov_type( cv, self.covariance_type, self.n_components ) if 'd' in self.init_params or not hasattr(self, 'degrees_'): self.degrees_ = np.tile(1.0, self.n_components) best_params = { 'weights': self.weights_, 'means': self.means_, 'covars': self.covars_, 'degrees': self.degrees_ } self.converged_ = False current_log_likelihood = None # EM algorithm for i in range(self.n_iter): prev_log_likelihood = current_log_likelihood # Expectation step likelihoods, responsibilities, gammaweights_ = \ self._expectation_step(X) # Sanity check: assert that the likelihoods, # responsibilities and gammaweights have the correct # dimensions assert(len(likelihoods.shape) == 1) assert(likelihoods.shape[0] == n_samples) assert(len(responsibilities.shape) == 2) assert(responsibilities.shape[0] == n_samples) assert(responsibilities.shape[1] == self.n_components) assert(len(gammaweights_.shape) == 2) assert(gammaweights_.shape[0] == n_samples) assert(gammaweights_.shape[1] == self.n_components) # Calculate loss function current_log_likelihood = np.log(likelihoods).mean() # Check for convergence if prev_log_likelihood is not None: change = np.abs(current_log_likelihood - prev_log_likelihood ) if change < self.tol: self.converged_ = True break # Maximisation step try: self._maximisation_step(X, responsibilities, gammaweights_ ) except dofMaximizationError as e: print( '[self._maximisation_step] Error in the ' \ + 'maximization step of the degrees of ' \ + 'freedom: ' + e.message ) break # If the results are better, keep it if self.n_iter and self.converged_: if current_log_likelihood > max_prob: max_prob = current_log_likelihood best_params = { 'weights': self.weights_, 'means': self.means_, 'covars': self.covars_, 'degrees': self.degrees_ } # Check the existence of an init param that was not subject to # likelihood computation issue if np.isneginf(max_prob) and self.n_iter: msg = 'EM algorithm was never able to compute a valid ' \ + 'likelihood given initial parameters. Try ' \ + 'different init parameters (or increasing ' \ + 'n_init) or check for degenerate data.' warnings.warn(msg, RuntimeWarning) # Choosing the best result of all the iterations as the actual # result if self.n_iter: self.weights_ = best_params['weights'] self.means_ = best_params['means'] self.covars_ = best_params['covars'] self.degrees_ = best_params['degrees'] return self
[docs] def predict(self, X): """Predict label for data. This function will tell you which component of the mixture most likely generated the sample. Parameters ---------- X : array-like, shape (n_samples, n_features). Returns ------- r_argmax : array_like, shape (n_samples,). """ _, responsibilities, _ = self._expectation_step(X) r_argmax = responsibilities.argmax(axis=1) return r_argmax
[docs] def predict_proba(self, X): """Predict label for data. This function will tell the probability of each component generating each sample. Parameters ---------- X : array-like, shape (n_samples, n_features). Returns ------- responsibilities : array_like, shape (n_samples, n_components). """ _, responsibilities, _ = self._expectation_step(X) return responsibilities
[docs] def score(self, X, y=None): """Compute the log probability under the model. Parameters ---------- X : array_like, shape (n_samples, n_features). Returns ------- prob : array_like, shape (n_samples,). Probabilities of each data point in X. """ prob, _ = self.score_samples(X) return prob
[docs] def score_samples(self, X): """Per-sample likelihood of the data under the model. Compute the probability of X under the model and return the posterior distribution (responsibilities) of each mixture component for each element of X. Parameters ---------- X : array_like, shape (n_samples, n_features). Returns ------- prob : array_like, shape (n_samples,). Unnormalised probability of each data point in X, i.e. likelihoods. responsibilities : array_like, shape (n_samples, n_components). Posterior probabilities of each mixture component for each observation. """ sklearn.utils.validation.check_is_fitted(self, 'means_') X = sklearn.utils.validation.check_array(X, dtype=np.float64) if X.ndim == 1: X = X[:, np.newaxis] if X.size == 0: return np.array([]), np.empty((0, self.n_components)) if X.shape[1] != self.means_.shape[1]: raise ValueError( '[score_samples] ValueError, the shape of X is not ' \ + 'compatible with self.' ) # Initialisation of reponsibilities and weight of each point for # the Gamma distribution n_samples, n_dim = X.shape responsibilities = np.ndarray( shape=(X.shape[0], self.n_components), dtype=np.float64 ) gammaweights_ = np.ndarray( shape=(X.shape[0], self.n_components), dtype=np.float64 ) # Calculate the probability of each point belonging to each # t-Student distribution of the mixture pr = self._multivariate_t_student_density( X, self.means_, self.covars_, self.degrees_, self.covariance_type, self.min_covar ) * self.weights_ # Calculate the likelihood of each point likelihoods = pr.sum(axis=1) # Update responsibilities like_ndarray = likelihoods.reshape(likelihoods.shape[0], 1) responsibilities = pr / (like_ndarray + 10 * SMM._EPS) return likelihoods, responsibilities
def _n_parameters(self): """Returns the number of free parameters in the model.""" _, n_features = self.means_.shape if self.covariance_type == 'full': cov_params = self.n_components \ * n_features \ * (n_features + 1) / 2.0 elif self.covariance_type == 'diag': cov_params = self.n_components * n_features elif self.covariance_type == 'tied': cov_params = n_features * (n_features + 1) / 2.0 elif self.covariance_type == 'spherical': cov_params = self.n_components mean_params = n_features * self.n_components df_params = self.n_components total_param = int( cov_params \ + mean_params \ + df_params \ + self.n_components \ - 1 ) return total_param
[docs] def bic(self, X): """Bayesian information criterion for the current model fit and the proposed data. Parameters ---------- X : array_like, shape (n_samples, n_features). Returns ------- A float (the lower the better). """ retval = - 2 * self.score(X).sum() \ + self._n_parameters() * np.log(X.shape[0]) return retval
[docs] def aic(self, X): """Akaike information criterion for the current model fit and the proposed data. Parameters ---------- X : array_like, shape (n_samples, n_features). Returns ------- A float (the lower the better). """ retval = - 2 * self.score(X).sum() + 2 * self._n_parameters() return retval
@staticmethod def _solve_dof_equation(v_vector, z, z_sum, u, n_dim, tol, n_iter): """Solves the equation to calculate the next value of v (degrees of freedom). This method is part of the maximisation step. It is used to calculate the next value for the degrees of freedom of each t-Student component. This method uses the information from the E-step as well as the number of dimensions (features) of a point. Parameters ---------- v_vector : array_like, shape (n_components,). Degrees of freedoom of ALL the components of the mixture. z : array_like, shape (n_samples, n_components). Matrix of responsibilities, each row represents a point and each column represents a component of the mixture. z_sum : array_like, shape (n_samples,). Sum of all the rows of the matrix of responsibilities. u : array_like, shape (n_samples, n_components). Matrix of gamma weights, each row represents a point and each column represents a component of the mixture. n_dim : integer. Number of features of each data point. Returns ------- new_v_vector : array_like (n_components,). Vector with the updated degrees of freedom for each component in the mixture. """ # Sanity check: check that the dimensionality of the vector of # degrees of freedom is correct assert(len(v_vector.shape) == 1) n_components = v_vector.shape[0] # Sanity check: the matrix of responsibilities should be # (n_samples x n_components) assert(len(z.shape) == 2) assert(z.shape[1] == n_components) # Sanity check: the top-to-bottom sum of the responsibilities # must have a shape (n_components, ) assert(len(z_sum.shape) == 1) assert(z_sum.shape[0] == n_components) # Sanity check: gamma weights matrix must have the same # dimensionality as the responsibilities assert(u.shape == z.shape) # Initialisation new_v_vector = np.empty_like(v_vector) # Calculate the constant part of the equation to calculate the # new degrees of freedom vdim = (v_vector + n_dim) / 2.0 zlogu_sum = np.sum(z * (np.log(u) - u), axis=0) constant_part = 1.0 \ + zlogu_sum / z_sum \ + scipy.special.digamma(vdim) \ - np.log(vdim) # Solve the equation numerically using Newton-Raphson for each # component of the mixture for c in range(n_components): def func(x): return np.log(x / 2.0) \ - scipy.special.digamma(x / 2.0) \ + constant_part[c] def fprime(x): return 1.0 / x \ - scipy.special.polygamma(1, x / 2.0) / 2.0 def fprime2(x): return - 1.0 / (x * x) \ - scipy.special.polygamma(2, x / 2.0) / 4.0 new_v_vector[c] = scipy.optimize.newton( func, v_vector[c], fprime, args=(), tol=tol, maxiter=n_iter, fprime2=fprime2 ) if new_v_vector[c] < 0.0: raise ValueError('[_solve_dof_equation] Error, ' \ + 'degree of freedom smaller than one. \n' \ + 'n_components[c] = ' \ + str(n_components) \ + '. \n' + 'v_vector[c] = ' \ + str(v_vector[c]) \ + '. \n' \ + 'new_v_vector[c] = ' \ + str(new_v_vector[c]) \ + '. \n' \ + 'constant_part[c] = ' \ + str(constant_part[c]) \ + '. \n' \ + 'zlogu_sum[c] = ' \ + str(zlogu_sum[c]) \ + '. \n' \ + 'z_sum[c] = ' \ + str(z_sum[c]) \ + '. \n' \ + 'z = ' + str(z) + '. \n' ) return new_v_vector @staticmethod def _covar_mstep_diag(X, zu, z_sum, means, min_covar): """Performing the covariance m-step for diagonal covariances. Parameters ---------- X : array_like, shape (n_samples, n_features). List of k_features-dimensional data points. Each row corresponds to a single data point. zu : array, shape (n_samples, n_components). Contains the element-wise multiplication of the responsibilities by the gamma weights. z_sum : array_like, shape (n_components,). Sum of all the responsibilities for each mixture. means : array_like, shape (n_components, n_features). List of n_features-dimensional mean vectors for n_components t-Students. Each row corresponds to a single mean vector. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- retval : array_like, shape (n_components, n_features). """ # Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian # Distribution" adapted to the mixture of t-students (i.e. # responsibilities matrix is multiplied element-wise by the # gamma weights matrix. See that zu.T is used in the calculation # of weighted_X_sum) norm = np.float64(1) / (z_sum[:, np.newaxis] + 10 * SMM._EPS) weighted_X_sum = np.dot(zu.T, X) avg_X2 = np.dot(zu.T, X * X) * norm avgmeans_2 = means ** 2 avg_Xmeans_ = means * weighted_X_sum * norm retval = avg_X2 - 2 * avg_Xmeans_ + avgmeans_2 + min_covar return retval @staticmethod def _covar_mstep_spherical(*args): cv = SMM._covar_mstep_diag(*args) return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1])) @staticmethod def _covar_mstep_full(X, zu, z_sum, means, min_covar): """Performing the covariance m-step for full covariances. Parameters ---------- X : array_like, shape (n_samples, n_features). List of k_features-dimensional data points. Each row corresponds to a single data point. zu : array_like, shape (n_samples, n_components). Contains the element-wise multiplication of the responsibilities by the gamma weights. z_sum : array_like, shape (n_components,) Sum of all the responsibilities for each mixture. means : array_like, shape (n_components, n_features). List of n_features-dimensional mean vectors for n_components t-Students. Each row corresponds to a single mean vector. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-7. Returns ------- cv : array_like, shape (n_components, n_features, n_features). New array of updated covariance matrices. """ # Sanity checks for dimensionality n_samples, n_features = X.shape n_components = means.shape[0] assert(zu.shape[0] == n_samples) assert(zu.shape[1] == n_components) assert(z_sum.shape[0] == n_components) # Eq. 31 from D. Peel and G. J. McLachlan, "Robust mixture # modelling using the t distribution" cv = np.empty((n_components, n_features, n_features)) zu_sum = zu.sum(axis=0) for c in range(n_components): post = zu[:, c] mu = means[c] diff = X - mu with np.errstate(under='ignore'): # Underflow Errors in doing post * X.T are not important if n_components == 1: avg_cv = np.dot(post * diff.T, diff) \ / (zu_sum[c] + 10 * SMM._EPS) else: avg_cv = np.dot(post * diff.T, diff) \ / (z_sum[c] + 10 * SMM._EPS) cv[c] = avg_cv + min_covar * np.eye(n_features) return cv @staticmethod def _covar_mstep_tied(X, zu, z_sum, means, min_covar): """Performing the for tied a covariance. Parameters ---------- X : array_like, shape (n_samples, n_features). List of k_features-dimensional data points. Each row corresponds to a single data point. zu : array_like, shape (n_samples, n_components). Contains the element-wise multiplication of the responsibilities by the gamma weights. z_sum : array_like, shape (n_components, ) Sum of all the responsibilities for each mixture. means : array_like, shape (n_components, n_features). List of n_features-dimensional mean vectors for n_components t-Students. Each row corresponds to a single mean vector. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- out : array_like, shape (n_features, n_features). """ avg_X2 = np.dot(zu.T, X * X) avg_means2 = np.dot(z_sum * means.T, means) out = avg_X2 - avg_means2 out /= z_sum.sum() out.flat[::len(out) + 1] += min_covar return out @staticmethod def _multivariate_t_student_density_diag(X, means, covars, dfs, min_covar): """Multivariate t-Student PDF for a matrix of data points and diagonal covariance matrices. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. means : array_like, shape (n_components, n_features). Each row corresponds to a single mean vector. covars : array_like, shape (n_components, n_features). List of n_components covariance parameters for each t-Student. dfs : array_like, shape (n_components,). Degrees of freedom. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- retval : array_like, shape (n_samples, n_components). Evaluation of the multivariate probability density function for a t-Student distribution. """ # Sanity check: make sure that the shape of means and # covariances is as expected for diagonal matrices n_samples, n_dim = X.shape assert(covars.shape[0] == means.shape[0]) assert(covars.shape[1] == means.shape[1]) assert(covars.shape[1] == n_dim) # Calculate inverse and determinant of the covariances inv_covars = 1.0 / covars det_covars = np.prod(covars, axis=1) # Calculate the value of the numerator num = scipy.special.gamma((dfs + n_dim) / 2.0) # Calculate Mahalanobis distance from all the points to the # mean of each component in the mix maha = SMM._mahalanobis_distance_mix_diag(X, means, covars, min_covar) # Calculate the value of the denominator braces = 1.0 + maha / dfs denom = np.power(np.pi * dfs, n_dim / 2.0) \ * scipy.special.gamma(dfs / 2.0) \ * np.sqrt(det_covars) \ * np.power(braces, (dfs + n_dim) / 2.0) retval = num / denom return retval @staticmethod def _multivariate_t_student_density_spherical(X, means, covars, dfs, min_covar): """Multivariate t-Student PDF for a matrix of data points. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. means : array_like, shape (n_components, n_features). Mean vectors for n_components t-Students. Each row corresponds to a single mean vector. covars : array_like, shape (n_components, n_features). Covariance parameters for each t-Student. dfs : array_like, shape (n_components,). Degrees of freedom. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- retval : array_like, shape (n_samples, n_components). Evaluation of the multivariate probability density function for a t-Student distribution. """ cv = covars.copy() if covars.ndim == 1: cv = cv[:, np.newaxis] if covars.shape[1] == 1: cv = np.tile(cv, (1, X.shape[-1])) # Sanity check: make sure that the covariance is spherical, # i.e. all the elements of the diagonal must be equal for k in range(cv.shape[0]): assert(np.unique(cv[k]).shape[0] == 1) retval = SMM._multivariate_t_student_density_diag( X, means, cv, dfs, min_covar ) return retval @staticmethod def _multivariate_t_student_density_tied(X, means, covars, dfs, min_covar): """Multivariate t-Student PDF for a matrix of data points. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. means : array_like, shape (n_components, n_features). Mean vectors for n_components t-Students. Each row corresponds to a single mean vector. covars : array_like, shape (n_features, n_features). Covariance parameters for each t-Student. dfs : array_like, shape (n_components,). Degrees of freedom. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- retval : array_like, shape (n_samples, n_components). Evaluation of the multivariate probability density function for a t-Student distribution. """ # Sanity check: make sure that the shape is (n_features, # n_features) and that it matches the shape of the vector of # means assert(len(covars.shape) == 2) assert(covars.shape[0] == covars.shape[1]) assert(means.shape[1] == covars.shape[0]) cv = np.tile(covars, (means.shape[0], 1, 1)) retval = SMM._multivariate_t_student_density_full( X, means, cv, dfs, min_covar ) return retval @staticmethod def _multivariate_t_student_density_full(X, means, covars, dfs, min_covar): """Multivariate t-Student PDF for a matrix of data points. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. means : array_like, shape (n_components, n_features). Mean vectors for n_components t-Students. Each row corresponds to a single mean vector. covars : array_like, shape (n_components, n_features, n_features). Covariance parameters for each t-Student. dfs : array_like, shape (n_components,). Degrees of freedom. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- prob : array_like, shape (n_samples, n_components). Evaluation of the multivariate probability density function for a t-Student distribution. """ n_samples, n_dim = X.shape n_components = len(means) prob = np.empty((n_samples, n_components)) # Sanity check: assert that the received means and covars have # the right shape assert(means.shape[0] == n_components) assert(covars.shape[0] == n_components) assert(dfs.shape[0] == n_components) # We evaluate all the saples for each component 'c' in the # mixture for c, (mu, cv, df) in enumerate(zip(means, covars, dfs)): # Calculate the Cholesky decomposition of the covariance # matrix cov_chol = SMM._cholesky(cv, min_covar) # Calculate the determinant of the covariance matrix cov_det = np.power(np.prod(np.diagonal(cov_chol)), 2) # Calculate the Mahalanobis distance between each vector and # the mean maha = SMM._mahalanobis_distance_chol(X, mu, cov_chol) # Calculate the coefficient of the gamma functions r = np.asarray(df, dtype=np.float64) log_gamma_coef = scipy.special.gammaln((r + n_dim) / 2.0) \ - scipy.special.gammaln(r / 2.0) # Calculate the denominator of the multivariate t-Student logdenom = np.log(np.sqrt(cov_det)) logdenom += n_dim / 2.0 * np.log(np.pi * df) logdenom += (df + n_dim) / 2 * np.log(1 + maha / df) # Finally calculate the PDF of the class 'c' for all the X # samples prob[:, c] = np.exp(log_gamma_coef - logdenom) return prob @staticmethod def _multivariate_t_student_density(X, means, covars, dfs, cov_type, min_covar): """Calculates the PDF of the multivariate t-student for a group of samples. This method has a dictionary with the different types of covariance matrices and calls the appropriate PDF function depending on the type of covariance matrix that is passed as parameter. This method assumes that the covariance matrices are full if the parameter cov_type is not specified when calling. Parameters ---------- X : array_like, shape (n_samples, n_features). Each row corresponds to a single data point. means : array_like, shape (n_components, n_features). Mean vectors for n_components t-Students. Each row corresponds to a single mean vector. covars : array_like, covariance parameters for each t-Student. The shape depends on `covariance_type`: (n_components, n_features) if 'spherical', (n_features, n_features) if 'tied', (n_components, n_features) if 'diag', (n_components, n_features, n_features) if 'full' cov_type : string. Indicates the type of the covariance parameters. It must be one of 'spherical', 'tied', 'diag', 'full'. Defaults to 'full'. Returns ------- retval : array_like, shape (n_samples, n_components). Array containing the probabilities of each data point in X under each of the n_components multivariate t-Student distributions. """ _multivariate_normal_density_dict = { 'diag': SMM._multivariate_t_student_density_diag, 'spherical': SMM._multivariate_t_student_density_spherical, 'tied': SMM._multivariate_t_student_density_tied, 'full': SMM._multivariate_t_student_density_full } retval = _multivariate_normal_density_dict[cov_type]( X, means, covars, dfs, min_covar ) return retval @staticmethod def _cholesky(cv, min_covar): """Calculates the lower triangular Cholesky decomposition of a covariance matrix. Parameters ---------- covar : array_like, shape (n_features, n_features). Covariance matrix whose Cholesky decomposition wants to be calculated. min_covar : float value. Minimum amount that will be added to the covariance matrix in case of trouble, usually 1.e-6. Returns ------- cov_chol : array_like, shape (n_features, n_features). Lower Cholesky decomposition of a covariance matrix. """ # Sanity check: assert that the covariance matrix is squared assert(cv.shape[0] == cv.shape[1]) # Sanity check: assert that the covariance matrix is symmetric if (cv.transpose() - cv).sum() > min_covar: print('[SMM._cholesky] Error, covariance matrix not ' \ + 'symmetric: ' + str(cv) ) n_dim = cv.shape[0] try: cov_chol = scipy.linalg.cholesky(cv, lower=True) except scipy.linalg.LinAlgError: # The model is most probably stuck in a component with too # few observations, we need to reinitialize this components cov_chol = scipy.linalg.cholesky( cv + min_covar * np.eye(n_dim), lower=True ) return cov_chol @staticmethod def _mahalanobis_distance_chol(X, mu, cov_chol): """Calculates the Mahalanobis distance between a matrix (set) of vectors (X) and another vector (mu). The vectors must be organised by row in X, that is, the features are the columns. Parameters ---------- X : array_like, shape (n_samples, n_features). Sample in each row. mu : array_like (n_features). Mean vector of a single distribution (no mixture). cov_chol : array_like, shape (n_features, n_features). Cholesky decomposition (L, i.e. lower triangular) of the covariance (normalising) matrix in case that is a full matrix. Returns ------- retval : array_like, shape (n_samples,). Array of distances, each row represents the distance from the vector in the same row of X and mu. """ z = scipy.linalg.solve_triangular( cov_chol, (X - mu).T, lower=True ) retval = np.einsum('ij, ij->j', z, z) return retval @staticmethod def _mahalanobis_distance_mix_diag(X, means, covars, min_covar): """Calculates the mahalanobis distance between a matrix of points and a mixture of distributions when the covariance matrices are diagonal. Parameters ---------- X : array_like, shape (n_samples, n_features). Matrix with a sample in each row. means : array_like, shape (n_components, n_features). Mean vectors for n_components t-Students. Each row corresponds to a single mean vector. covars : array_like, shape (n_components, n_features). Covariance parameters for each t-Student. Returns ------- result : array_like, shape (n_samples, n_components). Mahalanobis distance from all the samples to all the i component means. """ n_samples, n_dim = X.shape n_components = len(means) result = np.empty((n_samples, n_components)) for c, (mu, cv) in enumerate(zip(means, covars)): centred_X = X - mu inv_cov = np.float64(1) / cv result[:, c] = (centred_X * inv_cov * centred_X).sum(axis=1) return result @staticmethod def _mahalanobis_distance_mix_spherical(*args): return SMM._mahalanobis_distance_mix_diag(*args) @staticmethod def _mahalanobis_distance_mix_full(X, means, covars, min_covar): """Calculates the mahalanobis distance between a matrix of points and a mixture of distributions. Parameters ---------- X : array_like, shape (n_samples, n_features). Matrix with a sample vector in each row. means : array_like, shape (n_components, n_features). Each row corresponds to a single mean vector. covars : array_like, shape (n_components, n_features, n_features). Covariance parameters for each t-Student. Returns ------- result : array_like, shape (n_samples, n_components). Mahalanobis distance from all the samples to all the component means. """ n_samples, n_dim = X.shape n_components = len(means) result = np.empty((n_samples, n_components)) for c, (mu, cv) in enumerate(zip(means, covars)): cov_chol = SMM._cholesky(cv, min_covar) result[:, c] = SMM._mahalanobis_distance_chol( X, mu, cov_chol ) return result @staticmethod def _mahalanobis_distance_mix_tied(X, means, covars, min_covar): """Calculates the mahalanobis distance between a matrix of points and a mixture of distributions. Parameters ---------- X : array_like, shape (n_samples, n_features). Matrix with a sample vector in each row. means : array_like, shape (n_components, n_features). Each row corresponds to a single mean vector. covars : array_like, shape (n_features, n_features). Covariance parameters for each t-Student. Returns ------- retval : array_like, shape (n_samples, n_components). Mahalanobis distance from all the samples to all the component means. """ cv = np.tile(covars, (means.shape[0], 1, 1)) retval = SMM._mahalanobis_distance_mix_full( X, means, cv, min_covar ) return retval @staticmethod def _validate_covariances(covars, covariance_type, n_components): """Do basic checks on matrix covariance sizes and values.""" if covariance_type == 'full': if len(covars.shape) != 3: raise ValueError( "'full' covars must have shape (n_components, " \ + "n_dim, n_dim)" ) elif covars.shape[1] != covars.shape[2]: raise ValueError( "'full' covars must have shape (n_components, " \ + "n_dim, n_dim)" ) for n, cv in enumerate(covars): if (not np.allclose(cv, cv.T) or np.any(linalg.eigvalsh(cv) <= 0)): raise ValueError( "component %d of 'full' covars must be " \ + "symmetric, positive-definite" % n ) else: raise ValueError( "covariance_type must be one of " \ + "'spherical', 'tied', 'diag', 'full'" ) elif covariance_type == 'diag': if len(covars.shape) != 2: raise ValueError( "'diag' covars must have shape (n_components, " \ + "n_dim)") elif np.any(covars <= 0): raise ValueError("'diag' covars must be non-negative") elif covariance_type == 'spherical': if len(covars) != n_components: raise ValueError( "'spherical' covars have length n_components" ) elif np.any(covars <= 0): raise ValueError( "'spherical' covars must be non-negative" ) elif covariance_type == 'tied': if covars.shape[0] != covars.shape[1]: raise ValueError( "'tied' covars must have shape (n_dim, n_dim)") elif (not np.allclose(covars, covars.T) or np.any(np.linalg.eigvalsh(covars) <= 0)): raise ValueError( "'tied' covars must be symmetric, " \ + "positive-definite" )
[docs] @staticmethod def dist_covar_to_match_cov_type(tied_cv, covariance_type, n_components): """Create all the covariance matrices from a given template. Parameters ---------- tied_cv : array_like, shape (n_features, n_features). Tied covariance that is going to be converted to other type. covariance_type : string. Type of the covariance parameters. Must be one of 'spherical', 'tied', 'diag', 'full'. n_components : integer value. Number of components in the mixture. Returns ------- cv : array_like, shape (depends on the covariance_type parameter). Tied covariance in the format specified by the user. """ if covariance_type == 'spherical': cv = np.tile( tied_cv.mean() * np.ones(tied_cv.shape[1]), (n_components, 1) ) elif covariance_type == 'tied': cv = tied_cv elif covariance_type == 'diag': cv = np.tile(np.diag(tied_cv), (n_components, 1)) elif covariance_type == 'full': cv = np.tile(tied_cv, (n_components, 1, 1)) else: raise ValueError( "covariance_type must be one of " + "'spherical', 'tied', 'diag', 'full'" ) return cv
[docs] @staticmethod def multivariate_t_rvs(m, S, df=np.inf, n=1): """Generate multivariate random variable sample from a t-Student distribution. Author ------ Original code by Enzo Michelangeli. Modified by Luis C. Garcia-Peraza Herrera. This static method is exclusively used by 'tests/smm_test.py'. Parameters ---------- m : array_like, shape (n_features,). Mean vector, its length determines the dimension of the random variable. S : array_like, shape (n_features, n_features). Covariance matrix. df : int or float. Degrees of freedom. n : int. Number of observations. Returns ------- rvs : array_like, shape (n, len(m)). Each row is an independent draw of a multivariate t distributed random variable. """ # Sanity check: dimension of mean and covariance compatible assert(len(m.shape) == 1) assert(len(S.shape) == 2) assert(m.shape[0] == S.shape[0]) assert(m.shape[0] == S.shape[1]) # m = np.asarray(m) d = m.shape[0] # d = len(m) if df == np.inf: x = 1.0 else: x = np.random.chisquare(df, n) / df z = np.random.multivariate_normal(np.zeros(d), S, (n,)) retval = m + z / np.sqrt(x)[:, None] return retval
@property def weights(self): """Returns the weights of each component in the mixture.""" return self.weights_ @property def means(self): """Returns the means of each component in the mixture.""" return self.means_ @property def degrees(self): """Returns the degrees of freedom of each component in the mixture.""" return self.degrees_ @property def covariances(self): """Covariance parameters for each mixture component. Returns ------- The covariance matrices for all the classes. The shape depends on the type of covariance matrix: (n_classes, n_features) if 'diag', (n_classes, n_features, n_features) if 'full' (n_classes, n_features) if 'spherical', (n_features, n_features) if 'tied', """ if self.covariance_type == 'full': return self.covars_ elif self.covariance_type == 'diag': return [np.diag(cov) for cov in self.covars_] elif self.covariance_type == 'tied': return [self.covars_] * self.n_components elif self.covariance_type == 'spherical': return [np.diag(cov) for cov in self.covars_] # Class constants _covar_mstep_funcs = { 'spherical': _covar_mstep_spherical.__func__, 'diag': _covar_mstep_diag.__func__, 'tied': _covar_mstep_tied.__func__, 'full': _covar_mstep_full.__func__, } _mahalanobis_funcs = { 'spherical': _mahalanobis_distance_mix_spherical.__func__, 'diag': _mahalanobis_distance_mix_diag.__func__, 'tied': _mahalanobis_distance_mix_tied.__func__, 'full': _mahalanobis_distance_mix_full.__func__, } _EPS = np.finfo(np.float64).eps