1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Gaussian mixture models Operations.""" 16# TODO(xavigonzalvo): Factor out covariance matrix operations to make 17# code reusable for different types (e.g. diag). 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import numpy as np 24 25from tensorflow.python.framework import constant_op 26from tensorflow.python.framework import dtypes 27from tensorflow.python.framework import ops 28from tensorflow.python.ops import array_ops 29from tensorflow.python.ops import check_ops 30from tensorflow.python.ops import control_flow_ops 31from tensorflow.python.ops import linalg_ops 32from tensorflow.python.ops import math_ops 33from tensorflow.python.ops import random_ops 34from tensorflow.python.ops import state_ops 35from tensorflow.python.ops import variable_scope 36from tensorflow.python.ops import variables 37from tensorflow.python.ops.embedding_ops import embedding_lookup 38 39# Machine epsilon. 40MEPS = np.finfo(float).eps 41FULL_COVARIANCE = 'full' 42DIAG_COVARIANCE = 'diag' 43 44 45def _covariance(x, diag): 46 """Defines the covariance operation of a matrix. 47 48 Args: 49 x: a matrix Tensor. Dimension 0 should contain the number of examples. 50 diag: if True, it computes the diagonal covariance. 51 52 Returns: 53 A Tensor representing the covariance of x. In the case of 54 diagonal matrix just the diagonal is returned. 55 """ 56 num_points = math_ops.cast(array_ops.shape(x)[0], dtypes.float32) 57 x -= math_ops.reduce_mean(x, 0, keepdims=True) 58 if diag: 59 cov = math_ops.reduce_sum( 60 math_ops.square(x), 0, keepdims=True) / (num_points - 1) 61 else: 62 cov = math_ops.matmul(x, x, transpose_a=True) / (num_points - 1) 63 return cov 64 65 66def _init_clusters_random(data, num_clusters, random_seed): 67 """Does random initialization of clusters. 68 69 Args: 70 data: a list of Tensors with a matrix of data, each row is an example. 71 num_clusters: an integer with the number of clusters. 72 random_seed: Seed for PRNG used to initialize seeds. 73 74 Returns: 75 A Tensor with num_clusters random rows of data. 76 """ 77 assert isinstance(data, list) 78 num_data = math_ops.add_n([array_ops.shape(inp)[0] for inp in data]) 79 with ops.control_dependencies( 80 [check_ops.assert_less_equal(num_clusters, num_data)]): 81 indices = random_ops.random_uniform( 82 [num_clusters], 83 minval=0, 84 maxval=math_ops.cast(num_data, dtypes.int64), 85 seed=random_seed, 86 dtype=dtypes.int64) 87 indices %= math_ops.cast(num_data, dtypes.int64) 88 clusters_init = embedding_lookup(data, indices, partition_strategy='div') 89 return clusters_init 90 91 92class GmmAlgorithm(object): 93 """Tensorflow Gaussian mixture model clustering class.""" 94 CLUSTERS_WEIGHT = 'alphas' 95 CLUSTERS_VARIABLE = 'clusters' 96 CLUSTERS_COVS_VARIABLE = 'clusters_covs' 97 98 def __init__(self, 99 data, 100 num_classes, 101 initial_means=None, 102 params='wmc', 103 covariance_type=FULL_COVARIANCE, 104 random_seed=0): 105 """Constructor. 106 107 Args: 108 data: a list of Tensors with data, each row is a new example. 109 num_classes: number of clusters. 110 initial_means: a Tensor with a matrix of means. If None, means are 111 computed by sampling randomly. 112 params: Controls which parameters are updated in the training 113 process. Can contain any combination of "w" for weights, "m" for 114 means, and "c" for covariances. 115 covariance_type: one of "full", "diag". 116 random_seed: Seed for PRNG used to initialize seeds. 117 118 Raises: 119 Exception if covariance type is unknown. 120 """ 121 self._params = params 122 self._random_seed = random_seed 123 self._covariance_type = covariance_type 124 if self._covariance_type not in [DIAG_COVARIANCE, FULL_COVARIANCE]: 125 raise Exception( # pylint: disable=g-doc-exception 126 'programmer error: Invalid covariance type: %s' % 127 self._covariance_type) 128 # Create sharded variables for multiple shards. The following 129 # lists are indexed by shard. 130 # Probability per example in a class. 131 num_shards = len(data) 132 self._probs = [None] * num_shards 133 # Prior probability. 134 self._prior_probs = [None] * num_shards 135 # Membership weights w_{ik} where "i" is the i-th example and "k" 136 # is the k-th mixture. 137 self._w = [None] * num_shards 138 # Number of examples in a class. 139 self._points_in_k = [None] * num_shards 140 first_shard = data[0] 141 self._dimensions = array_ops.shape(first_shard)[1] 142 self._num_classes = num_classes 143 # Small value to guarantee that covariances are invertible. 144 self._min_var = array_ops.diag( 145 array_ops.ones(array_ops.stack([self._dimensions]))) * 1e-3 146 self._create_variables() 147 self._initialize_variables(data, initial_means) 148 # Operations of partial statistics for the computation of the means. 149 self._w_mul_x = [] 150 # Operations of partial statistics for the computation of the covariances. 151 self._w_mul_x2 = [] 152 self._define_graph(data) 153 154 def _create_variables(self): 155 """Initializes GMM algorithm.""" 156 init_value = array_ops.constant([], dtype=dtypes.float32) 157 self._means = variables.VariableV1(init_value, 158 name=self.CLUSTERS_VARIABLE, 159 validate_shape=False) 160 self._covs = variables.VariableV1( 161 init_value, name=self.CLUSTERS_COVS_VARIABLE, validate_shape=False) 162 # Mixture weights, representing the probability that a randomly 163 # selected unobservable data (in EM terms) was generated by component k. 164 self._alpha = variable_scope.variable( 165 array_ops.tile([1.0 / self._num_classes], [self._num_classes]), 166 name=self.CLUSTERS_WEIGHT, 167 validate_shape=False) 168 self._cluster_centers_initialized = variables.VariableV1(False, 169 dtype=dtypes.bool, 170 name='initialized') 171 172 def _initialize_variables(self, data, initial_means=None): 173 """Initializes variables. 174 175 Args: 176 data: a list of Tensors with data, each row is a new example. 177 initial_means: a Tensor with a matrix of means. 178 """ 179 first_shard = data[0] 180 # Initialize means: num_classes X 1 X dimensions. 181 if initial_means is not None: 182 means = array_ops.expand_dims(initial_means, 1) 183 else: 184 # Sample data randomly 185 means = array_ops.expand_dims( 186 _init_clusters_random(data, self._num_classes, self._random_seed), 1) 187 188 # Initialize covariances. 189 if self._covariance_type == FULL_COVARIANCE: 190 cov = _covariance(first_shard, False) + self._min_var 191 # A matrix per class, num_classes X dimensions X dimensions 192 covs = array_ops.tile( 193 array_ops.expand_dims(cov, 0), [self._num_classes, 1, 1]) 194 elif self._covariance_type == DIAG_COVARIANCE: 195 cov = _covariance(first_shard, True) + self._min_var 196 # A diagonal per row, num_classes X dimensions. 197 covs = array_ops.tile( 198 array_ops.expand_dims(array_ops.diag_part(cov), 0), 199 [self._num_classes, 1]) 200 201 with ops.colocate_with(self._cluster_centers_initialized): 202 initialized = control_flow_ops.with_dependencies( 203 [means, covs], 204 array_ops.identity(self._cluster_centers_initialized)) 205 self._init_ops = [] 206 with ops.colocate_with(self._means): 207 init_means = state_ops.assign(self._means, means, validate_shape=False) 208 init_means = control_flow_ops.with_dependencies( 209 [init_means], 210 state_ops.assign(self._cluster_centers_initialized, True)) 211 self._init_ops.append(control_flow_ops.cond(initialized, 212 control_flow_ops.no_op, 213 lambda: init_means).op) 214 with ops.colocate_with(self._covs): 215 init_covs = state_ops.assign(self._covs, covs, validate_shape=False) 216 init_covs = control_flow_ops.with_dependencies( 217 [init_covs], 218 state_ops.assign(self._cluster_centers_initialized, True)) 219 self._init_ops.append(control_flow_ops.cond(initialized, 220 control_flow_ops.no_op, 221 lambda: init_covs).op) 222 223 def init_ops(self): 224 """Returns the initialization operation.""" 225 return control_flow_ops.group(*self._init_ops) 226 227 def training_ops(self): 228 """Returns the training operation.""" 229 return control_flow_ops.group(*self._train_ops) 230 231 def is_initialized(self): 232 """Returns a boolean operation for initialized variables.""" 233 return self._cluster_centers_initialized 234 235 def alphas(self): 236 return self._alpha 237 238 def clusters(self): 239 """Returns the clusters with dimensions num_classes X 1 X num_dimensions.""" 240 return self._means 241 242 def covariances(self): 243 """Returns the covariances matrices.""" 244 return self._covs 245 246 def assignments(self): 247 """Returns a list of Tensors with the matrix of assignments per shard.""" 248 ret = [] 249 for w in self._w: 250 ret.append(math_ops.argmax(w, 1)) 251 return ret 252 253 def scores(self): 254 """Returns the per-sample likelihood fo the data. 255 256 Returns: 257 Log probabilities of each data point. 258 """ 259 return self._scores 260 261 def log_likelihood_op(self): 262 """Returns the log-likelihood operation.""" 263 return self._log_likelihood_op 264 265 def _define_graph(self, data): 266 """Define graph for a single iteration. 267 268 Args: 269 data: a list of Tensors defining the training data. 270 """ 271 for shard_id, shard in enumerate(data): 272 self._num_examples = array_ops.shape(shard)[0] 273 shard = array_ops.expand_dims(shard, 0) 274 self._define_log_prob_operation(shard_id, shard) 275 self._define_prior_log_prob_operation(shard_id) 276 self._define_expectation_operation(shard_id) 277 self._define_partial_maximization_operation(shard_id, shard) 278 self._define_maximization_operation(len(data)) 279 self._define_loglikelihood_operation() 280 self._define_score_samples() 281 282 def _define_full_covariance_probs(self, shard_id, shard): 283 """Defines the full covariance probabilities per example in a class. 284 285 Updates a matrix with dimension num_examples X num_classes. 286 287 Args: 288 shard_id: id of the current shard. 289 shard: current data shard, 1 X num_examples X dimensions. 290 """ 291 diff = shard - self._means 292 cholesky = linalg_ops.cholesky(self._covs + self._min_var) 293 log_det_covs = 2.0 * math_ops.reduce_sum( 294 math_ops.log(array_ops.matrix_diag_part(cholesky)), 1) 295 x_mu_cov = math_ops.square( 296 linalg_ops.matrix_triangular_solve( 297 cholesky, array_ops.transpose( 298 diff, perm=[0, 2, 1]), lower=True)) 299 diag_m = array_ops.transpose(math_ops.reduce_sum(x_mu_cov, 1)) 300 self._probs[shard_id] = ( 301 -0.5 * (diag_m + math_ops.cast(self._dimensions, dtypes.float32) * 302 math_ops.log(2 * np.pi) + log_det_covs)) 303 304 def _define_diag_covariance_probs(self, shard_id, shard): 305 """Defines the diagonal covariance probabilities per example in a class. 306 307 Args: 308 shard_id: id of the current shard. 309 shard: current data shard, 1 X num_examples X dimensions. 310 311 Returns a matrix num_examples * num_classes. 312 """ 313 # num_classes X 1 314 # TODO(xavigonzalvo): look into alternatives to log for 315 # reparametrization of variance parameters. 316 det_expanded = math_ops.reduce_sum( 317 math_ops.log(self._covs + 1e-3), 1, keepdims=True) 318 x2 = math_ops.squared_difference(shard, self._means) 319 cov_expanded = array_ops.expand_dims(1.0 / (self._covs + 1e-3), 2) 320 # num_classes X num_examples 321 x2_cov = math_ops.matmul(x2, cov_expanded) 322 x2_cov = array_ops.transpose(array_ops.squeeze(x2_cov, [2])) 323 self._probs[shard_id] = -0.5 * ( 324 math_ops.cast(self._dimensions, dtypes.float32) * 325 math_ops.log(2.0 * np.pi) + 326 array_ops.transpose(det_expanded) + x2_cov) 327 328 def _define_log_prob_operation(self, shard_id, shard): 329 """Probability per example in a class. 330 331 Updates a matrix with dimension num_examples X num_classes. 332 333 Args: 334 shard_id: id of the current shard. 335 shard: current data shard, 1 X num_examples X dimensions. 336 """ 337 # TODO(xavigonzalvo): Use the pdf defined in 338 # third_party/tensorflow/contrib/distributions/python/ops/gaussian.py 339 if self._covariance_type == FULL_COVARIANCE: 340 self._define_full_covariance_probs(shard_id, shard) 341 elif self._covariance_type == DIAG_COVARIANCE: 342 self._define_diag_covariance_probs(shard_id, shard) 343 self._probs[shard_id] += math_ops.log(self._alpha) 344 345 def _define_prior_log_prob_operation(self, shard_id): 346 """Computes the prior probability of all samples. 347 348 Updates a vector where each item is the prior probability of an 349 input example. 350 351 Args: 352 shard_id: id of current shard_id. 353 """ 354 self._prior_probs[shard_id] = math_ops.reduce_logsumexp( 355 self._probs[shard_id], axis=1, keepdims=True) 356 357 def _define_expectation_operation(self, shard_id): 358 # Shape broadcasting. 359 probs = array_ops.expand_dims(self._probs[shard_id], 0) 360 # Membership weights are computed as: 361 # $$w_{ik} = \frac{\alpha_k f(\mathbf{y_i}|\mathbf{\theta}_k)}$$ 362 # $$ {\sum_{m=1}^{K}\alpha_mf(\mathbf{y_i}|\mathbf{\theta}_m)}$$ 363 # where "i" is the i-th example, "k" is the k-th mixture, theta are 364 # the model parameters and y_i the observations. 365 # These are defined for each shard. 366 self._w[shard_id] = array_ops.reshape( 367 math_ops.exp(probs - self._prior_probs[shard_id]), 368 array_ops.stack([self._num_examples, self._num_classes])) 369 370 def _define_partial_maximization_operation(self, shard_id, shard): 371 """Computes the partial statistics of the means and covariances. 372 373 Args: 374 shard_id: current shard id. 375 shard: current data shard, 1 X num_examples X dimensions. 376 """ 377 # Soft assignment of each data point to each of the two clusters. 378 self._points_in_k[shard_id] = math_ops.reduce_sum( 379 self._w[shard_id], 0, keepdims=True) 380 # Partial means. 381 w_mul_x = array_ops.expand_dims( 382 math_ops.matmul( 383 self._w[shard_id], array_ops.squeeze(shard, [0]), transpose_a=True), 384 1) 385 self._w_mul_x.append(w_mul_x) 386 # Partial covariances. 387 x = array_ops.concat([shard for _ in range(self._num_classes)], 0) 388 x_trans = array_ops.transpose(x, perm=[0, 2, 1]) 389 x_mul_w = array_ops.concat([ 390 array_ops.expand_dims(x_trans[k, :, :] * self._w[shard_id][:, k], 0) 391 for k in range(self._num_classes) 392 ], 0) 393 self._w_mul_x2.append(math_ops.matmul(x_mul_w, x)) 394 395 def _define_maximization_operation(self, num_batches): 396 """Maximization operations.""" 397 # TODO(xavigonzalvo): some of these operations could be moved to C++. 398 # Compute the effective number of data points assigned to component k. 399 with ops.control_dependencies(self._w): 400 points_in_k = array_ops.squeeze( 401 math_ops.add_n(self._points_in_k), axis=[0]) 402 # Update alpha. 403 if 'w' in self._params: 404 final_points_in_k = points_in_k / num_batches 405 num_examples = math_ops.cast(math_ops.reduce_sum(final_points_in_k), 406 dtypes.float32) 407 self._alpha_op = self._alpha.assign(final_points_in_k / 408 (num_examples + MEPS)) 409 else: 410 self._alpha_op = control_flow_ops.no_op() 411 self._train_ops = [self._alpha_op] 412 413 # Update means. 414 points_in_k_expanded = array_ops.reshape(points_in_k, 415 [self._num_classes, 1, 1]) 416 if 'm' in self._params: 417 self._means_op = self._means.assign( 418 math_ops.div( 419 math_ops.add_n(self._w_mul_x), points_in_k_expanded + MEPS)) 420 else: 421 self._means_op = control_flow_ops.no_op() 422 # means are (num_classes x 1 x dims) 423 424 # Update covariances. 425 with ops.control_dependencies([self._means_op]): 426 b = math_ops.add_n(self._w_mul_x2) / (points_in_k_expanded + MEPS) 427 new_covs = [] 428 for k in range(self._num_classes): 429 mean = self._means.value()[k, :, :] 430 square_mean = math_ops.matmul(mean, mean, transpose_a=True) 431 new_cov = b[k, :, :] - square_mean + self._min_var 432 if self._covariance_type == FULL_COVARIANCE: 433 new_covs.append(array_ops.expand_dims(new_cov, 0)) 434 elif self._covariance_type == DIAG_COVARIANCE: 435 new_covs.append( 436 array_ops.expand_dims(array_ops.diag_part(new_cov), 0)) 437 new_covs = array_ops.concat(new_covs, 0) 438 if 'c' in self._params: 439 # Train operations don't need to take care of the means 440 # because covariances already depend on it. 441 with ops.control_dependencies([self._means_op, new_covs]): 442 self._train_ops.append( 443 state_ops.assign( 444 self._covs, new_covs, validate_shape=False)) 445 446 def _define_loglikelihood_operation(self): 447 """Defines the total log-likelihood of current iteration.""" 448 op = [] 449 for prior_probs in self._prior_probs: 450 op.append(math_ops.reduce_logsumexp(prior_probs)) 451 self._log_likelihood_op = math_ops.reduce_logsumexp(op) 452 453 def _define_score_samples(self): 454 """Defines the likelihood of each data sample.""" 455 op = [] 456 for shard_id, prior_probs in enumerate(self._prior_probs): 457 op.append(prior_probs + math_ops.log(self._w[shard_id])) 458 self._scores = array_ops.squeeze( 459 math_ops.reduce_logsumexp(op, axis=2, keepdims=True), axis=0) 460 461 462def gmm(inp, 463 initial_clusters, 464 num_clusters, 465 random_seed, 466 covariance_type=FULL_COVARIANCE, 467 params='wmc'): 468 """Creates the graph for Gaussian mixture model (GMM) clustering. 469 470 Args: 471 inp: An input tensor or list of input tensors 472 initial_clusters: Specifies the clusters used during 473 initialization. Can be a tensor or numpy array, or a function 474 that generates the clusters. Can also be "random" to specify 475 that clusters should be chosen randomly from input data. Note: type 476 is diverse to be consistent with skflow. 477 num_clusters: number of clusters. 478 random_seed: Python integer. Seed for PRNG used to initialize centers. 479 covariance_type: one of "diag", "full". 480 params: Controls which parameters are updated in the training 481 process. Can contain any combination of "w" for weights, "m" for 482 means, and "c" for covars. 483 484 Returns: 485 Note: tuple of lists returned to be consistent with skflow 486 A tuple consisting of: 487 assignments: A vector (or list of vectors). Each element in the vector 488 corresponds to an input row in 'inp' and specifies the cluster id 489 corresponding to the input. 490 training_op: an op that runs an iteration of training. 491 init_op: an op that runs the initialization. 492 """ 493 initial_means = None 494 if initial_clusters != 'random' and not isinstance(initial_clusters, 495 ops.Tensor): 496 initial_means = constant_op.constant(initial_clusters, dtype=dtypes.float32) 497 498 # Implementation of GMM. 499 inp = inp if isinstance(inp, list) else [inp] 500 gmm_tool = GmmAlgorithm(inp, num_clusters, initial_means, params, 501 covariance_type, random_seed) 502 assignments = gmm_tool.assignments() 503 scores = gmm_tool.scores() 504 loss = gmm_tool.log_likelihood_op() 505 return (loss, scores, [assignments], gmm_tool.training_ops(), 506 gmm_tool.init_ops(), gmm_tool.is_initialized()) 507