1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Gaussian mixture models Operations.""" 16# TODO(xavigonzalvo): Factor out covariance matrix operations to make 17# code reusable for different types (e.g. diag). 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import numpy as np 24 25from tensorflow.python.framework import constant_op 26from tensorflow.python.framework import dtypes 27from tensorflow.python.framework import ops 28from tensorflow.python.ops import array_ops 29from tensorflow.python.ops import check_ops 30from tensorflow.python.ops import control_flow_ops 31from tensorflow.python.ops import linalg_ops 32from tensorflow.python.ops import math_ops 33from tensorflow.python.ops import random_ops 34from tensorflow.python.ops import state_ops 35from tensorflow.python.ops import variable_scope 36from tensorflow.python.ops import variables 37from tensorflow.python.ops.embedding_ops import embedding_lookup 38 39# Machine epsilon. 40MEPS = np.finfo(float).eps 41FULL_COVARIANCE = 'full' 42DIAG_COVARIANCE = 'diag' 43 44 45def _covariance(x, diag): 46 """Defines the covariance operation of a matrix. 47 48 Args: 49 x: a matrix Tensor. Dimension 0 should contain the number of examples. 50 diag: if True, it computes the diagonal covariance. 51 52 Returns: 53 A Tensor representing the covariance of x. In the case of 54 diagonal matrix just the diagonal is returned. 55 """ 56 num_points = math_ops.to_float(array_ops.shape(x)[0]) 57 x -= math_ops.reduce_mean(x, 0, keep_dims=True) 58 if diag: 59 cov = math_ops.reduce_sum( 60 math_ops.square(x), 0, keep_dims=True) / (num_points - 1) 61 else: 62 cov = math_ops.matmul(x, x, transpose_a=True) / (num_points - 1) 63 return cov 64 65 66def _init_clusters_random(data, num_clusters, random_seed): 67 """Does random initialization of clusters. 68 69 Args: 70 data: a list of Tensors with a matrix of data, each row is an example. 71 num_clusters: an integer with the number of clusters. 72 random_seed: Seed for PRNG used to initialize seeds. 73 74 Returns: 75 A Tensor with num_clusters random rows of data. 76 """ 77 assert isinstance(data, list) 78 num_data = math_ops.add_n([array_ops.shape(inp)[0] for inp in data]) 79 with ops.control_dependencies( 80 [check_ops.assert_less_equal(num_clusters, num_data)]): 81 indices = random_ops.random_uniform( 82 [num_clusters], 83 minval=0, 84 maxval=math_ops.cast(num_data, dtypes.int64), 85 seed=random_seed, 86 dtype=dtypes.int64) 87 indices %= math_ops.cast(num_data, dtypes.int64) 88 clusters_init = embedding_lookup(data, indices, partition_strategy='div') 89 return clusters_init 90 91 92class GmmAlgorithm(object): 93 """Tensorflow Gaussian mixture model clustering class.""" 94 CLUSTERS_WEIGHT = 'alphas' 95 CLUSTERS_VARIABLE = 'clusters' 96 CLUSTERS_COVS_VARIABLE = 'clusters_covs' 97 98 def __init__(self, 99 data, 100 num_classes, 101 initial_means=None, 102 params='wmc', 103 covariance_type=FULL_COVARIANCE, 104 random_seed=0): 105 """Constructor. 106 107 Args: 108 data: a list of Tensors with data, each row is a new example. 109 num_classes: number of clusters. 110 initial_means: a Tensor with a matrix of means. If None, means are 111 computed by sampling randomly. 112 params: Controls which parameters are updated in the training 113 process. Can contain any combination of "w" for weights, "m" for 114 means, and "c" for covariances. 115 covariance_type: one of "full", "diag". 116 random_seed: Seed for PRNG used to initialize seeds. 117 118 Raises: 119 Exception if covariance type is unknown. 120 """ 121 self._params = params 122 self._random_seed = random_seed 123 self._covariance_type = covariance_type 124 if self._covariance_type not in [DIAG_COVARIANCE, FULL_COVARIANCE]: 125 raise Exception( # pylint: disable=g-doc-exception 126 'programmer error: Invalid covariance type: %s' % 127 self._covariance_type) 128 # Create sharded variables for multiple shards. The following 129 # lists are indexed by shard. 130 # Probability per example in a class. 131 num_shards = len(data) 132 self._probs = [None] * num_shards 133 # Prior probability. 134 self._prior_probs = [None] * num_shards 135 # Membership weights w_{ik} where "i" is the i-th example and "k" 136 # is the k-th mixture. 137 self._w = [None] * num_shards 138 # Number of examples in a class. 139 self._points_in_k = [None] * num_shards 140 first_shard = data[0] 141 self._dimensions = array_ops.shape(first_shard)[1] 142 self._num_classes = num_classes 143 # Small value to guarantee that covariances are invertible. 144 self._min_var = array_ops.diag( 145 array_ops.ones(array_ops.stack([self._dimensions]))) * 1e-3 146 self._create_variables() 147 self._initialize_variables(data, initial_means) 148 # Operations of partial statistics for the computation of the means. 149 self._w_mul_x = [] 150 # Operations of partial statistics for the computation of the covariances. 151 self._w_mul_x2 = [] 152 self._define_graph(data) 153 154 def _create_variables(self): 155 """Initializes GMM algorithm.""" 156 init_value = array_ops.constant([], dtype=dtypes.float32) 157 self._means = variables.Variable(init_value, 158 name=self.CLUSTERS_VARIABLE, 159 validate_shape=False) 160 self._covs = variables.Variable( 161 init_value, name=self.CLUSTERS_COVS_VARIABLE, validate_shape=False) 162 # Mixture weights, representing the probability that a randomly 163 # selected unobservable data (in EM terms) was generated by component k. 164 self._alpha = variable_scope.variable( 165 array_ops.tile([1.0 / self._num_classes], [self._num_classes]), 166 name=self.CLUSTERS_WEIGHT, 167 validate_shape=False) 168 self._cluster_centers_initialized = variables.Variable(False, 169 dtype=dtypes.bool, 170 name='initialized') 171 172 def _initialize_variables(self, data, initial_means=None): 173 """Initializes variables. 174 175 Args: 176 data: a list of Tensors with data, each row is a new example. 177 initial_means: a Tensor with a matrix of means. 178 """ 179 first_shard = data[0] 180 # Initialize means: num_classes X 1 X dimensions. 181 if initial_means is not None: 182 means = array_ops.expand_dims(initial_means, 1) 183 else: 184 # Sample data randomly 185 means = array_ops.expand_dims( 186 _init_clusters_random(data, self._num_classes, self._random_seed), 1) 187 188 # Initialize covariances. 189 if self._covariance_type == FULL_COVARIANCE: 190 cov = _covariance(first_shard, False) + self._min_var 191 # A matrix per class, num_classes X dimensions X dimensions 192 covs = array_ops.tile( 193 array_ops.expand_dims(cov, 0), [self._num_classes, 1, 1]) 194 elif self._covariance_type == DIAG_COVARIANCE: 195 cov = _covariance(first_shard, True) + self._min_var 196 # A diagonal per row, num_classes X dimensions. 197 covs = array_ops.tile( 198 array_ops.expand_dims(array_ops.diag_part(cov), 0), 199 [self._num_classes, 1]) 200 201 with ops.colocate_with(self._cluster_centers_initialized): 202 initialized = control_flow_ops.with_dependencies( 203 [means, covs], 204 array_ops.identity(self._cluster_centers_initialized)) 205 self._init_ops = [] 206 with ops.colocate_with(self._means): 207 init_means = state_ops.assign(self._means, means, validate_shape=False) 208 init_means = control_flow_ops.with_dependencies( 209 [init_means], 210 state_ops.assign(self._cluster_centers_initialized, True)) 211 self._init_ops.append(control_flow_ops.cond(initialized, 212 control_flow_ops.no_op, 213 lambda: init_means).op) 214 with ops.colocate_with(self._covs): 215 init_covs = state_ops.assign(self._covs, covs, validate_shape=False) 216 init_covs = control_flow_ops.with_dependencies( 217 [init_covs], 218 state_ops.assign(self._cluster_centers_initialized, True)) 219 self._init_ops.append(control_flow_ops.cond(initialized, 220 control_flow_ops.no_op, 221 lambda: init_covs).op) 222 223 def init_ops(self): 224 """Returns the initialization operation.""" 225 return control_flow_ops.group(*self._init_ops) 226 227 def training_ops(self): 228 """Returns the training operation.""" 229 return control_flow_ops.group(*self._train_ops) 230 231 def is_initialized(self): 232 """Returns a boolean operation for initialized variables.""" 233 return self._cluster_centers_initialized 234 235 def alphas(self): 236 return self._alpha 237 238 def clusters(self): 239 """Returns the clusters with dimensions num_classes X 1 X num_dimensions.""" 240 return self._means 241 242 def covariances(self): 243 """Returns the covariances matrices.""" 244 return self._covs 245 246 def assignments(self): 247 """Returns a list of Tensors with the matrix of assignments per shard.""" 248 ret = [] 249 for w in self._w: 250 ret.append(math_ops.argmax(w, 1)) 251 return ret 252 253 def scores(self): 254 """Returns the per-sample likelihood fo the data. 255 256 Returns: 257 Log probabilities of each data point. 258 """ 259 return self._scores 260 261 def log_likelihood_op(self): 262 """Returns the log-likelihood operation.""" 263 return self._log_likelihood_op 264 265 def _define_graph(self, data): 266 """Define graph for a single iteration. 267 268 Args: 269 data: a list of Tensors defining the training data. 270 """ 271 for shard_id, shard in enumerate(data): 272 self._num_examples = array_ops.shape(shard)[0] 273 shard = array_ops.expand_dims(shard, 0) 274 self._define_log_prob_operation(shard_id, shard) 275 self._define_prior_log_prob_operation(shard_id) 276 self._define_expectation_operation(shard_id) 277 self._define_partial_maximization_operation(shard_id, shard) 278 self._define_maximization_operation(len(data)) 279 self._define_loglikelihood_operation() 280 self._define_score_samples() 281 282 def _define_full_covariance_probs(self, shard_id, shard): 283 """Defines the full covariance probabilties per example in a class. 284 285 Updates a matrix with dimension num_examples X num_classes. 286 287 Args: 288 shard_id: id of the current shard. 289 shard: current data shard, 1 X num_examples X dimensions. 290 """ 291 diff = shard - self._means 292 cholesky = linalg_ops.cholesky(self._covs + self._min_var) 293 log_det_covs = 2.0 * math_ops.reduce_sum( 294 math_ops.log(array_ops.matrix_diag_part(cholesky)), 1) 295 x_mu_cov = math_ops.square( 296 linalg_ops.matrix_triangular_solve( 297 cholesky, array_ops.transpose( 298 diff, perm=[0, 2, 1]), lower=True)) 299 diag_m = array_ops.transpose(math_ops.reduce_sum(x_mu_cov, 1)) 300 self._probs[shard_id] = -0.5 * (diag_m + math_ops.to_float(self._dimensions) 301 * math_ops.log(2 * np.pi) + log_det_covs) 302 303 def _define_diag_covariance_probs(self, shard_id, shard): 304 """Defines the diagonal covariance probabilities per example in a class. 305 306 Args: 307 shard_id: id of the current shard. 308 shard: current data shard, 1 X num_examples X dimensions. 309 310 Returns a matrix num_examples * num_classes. 311 """ 312 # num_classes X 1 313 # TODO(xavigonzalvo): look into alternatives to log for 314 # reparametrization of variance parameters. 315 det_expanded = math_ops.reduce_sum( 316 math_ops.log(self._covs + 1e-3), 1, keep_dims=True) 317 diff = shard - self._means 318 x2 = math_ops.square(diff) 319 cov_expanded = array_ops.expand_dims(1.0 / (self._covs + 1e-3), 2) 320 # num_classes X num_examples 321 x2_cov = math_ops.matmul(x2, cov_expanded) 322 x2_cov = array_ops.transpose(array_ops.squeeze(x2_cov, [2])) 323 self._probs[shard_id] = -0.5 * ( 324 math_ops.to_float(self._dimensions) * math_ops.log(2.0 * np.pi) + 325 array_ops.transpose(det_expanded) + x2_cov) 326 327 def _define_log_prob_operation(self, shard_id, shard): 328 """Probability per example in a class. 329 330 Updates a matrix with dimension num_examples X num_classes. 331 332 Args: 333 shard_id: id of the current shard. 334 shard: current data shard, 1 X num_examples X dimensions. 335 """ 336 # TODO(xavigonzalvo): Use the pdf defined in 337 # third_party/tensorflow/contrib/distributions/python/ops/gaussian.py 338 if self._covariance_type == FULL_COVARIANCE: 339 self._define_full_covariance_probs(shard_id, shard) 340 elif self._covariance_type == DIAG_COVARIANCE: 341 self._define_diag_covariance_probs(shard_id, shard) 342 self._probs[shard_id] += math_ops.log(self._alpha) 343 344 def _define_prior_log_prob_operation(self, shard_id): 345 """Computes the prior probability of all samples. 346 347 Updates a vector where each item is the prior probabibility of an 348 input example. 349 350 Args: 351 shard_id: id of current shard_id. 352 """ 353 self._prior_probs[shard_id] = math_ops.reduce_logsumexp( 354 self._probs[shard_id], axis=1, keep_dims=True) 355 356 def _define_expectation_operation(self, shard_id): 357 # Shape broadcasting. 358 probs = array_ops.expand_dims(self._probs[shard_id], 0) 359 # Membership weights are computed as: 360 # w_{ik} = \frac{\alpha_k f(\mathbf{y_i}|\mathbf{\theta}_k)} 361 # {\sum_{m=1}^{K}\alpha_mf(\mathbf{y_i}|\mathbf{\theta}_m)} 362 # where "i" is the i-th example, "k" is the k-th mixture, theta are 363 # the model parameters and y_i the observations. 364 # These are defined for each shard. 365 self._w[shard_id] = array_ops.reshape( 366 math_ops.exp(probs - self._prior_probs[shard_id]), 367 array_ops.stack([self._num_examples, self._num_classes])) 368 369 def _define_partial_maximization_operation(self, shard_id, shard): 370 """Computes the partial statistics of the means and covariances. 371 372 Args: 373 shard_id: current shard id. 374 shard: current data shard, 1 X num_examples X dimensions. 375 """ 376 # Soft assignment of each data point to each of the two clusters. 377 self._points_in_k[shard_id] = math_ops.reduce_sum( 378 self._w[shard_id], 0, keep_dims=True) 379 # Partial means. 380 w_mul_x = array_ops.expand_dims( 381 math_ops.matmul( 382 self._w[shard_id], array_ops.squeeze(shard, [0]), transpose_a=True), 383 1) 384 self._w_mul_x.append(w_mul_x) 385 # Partial covariances. 386 x = array_ops.concat([shard for _ in range(self._num_classes)], 0) 387 x_trans = array_ops.transpose(x, perm=[0, 2, 1]) 388 x_mul_w = array_ops.concat([ 389 array_ops.expand_dims(x_trans[k, :, :] * self._w[shard_id][:, k], 0) 390 for k in range(self._num_classes) 391 ], 0) 392 self._w_mul_x2.append(math_ops.matmul(x_mul_w, x)) 393 394 def _define_maximization_operation(self, num_batches): 395 """Maximization operations.""" 396 # TODO(xavigonzalvo): some of these operations could be moved to C++. 397 # Compute the effective number of data points assigned to component k. 398 with ops.control_dependencies(self._w): 399 points_in_k = array_ops.squeeze( 400 math_ops.add_n(self._points_in_k), squeeze_dims=[0]) 401 # Update alpha. 402 if 'w' in self._params: 403 final_points_in_k = points_in_k / num_batches 404 num_examples = math_ops.to_float(math_ops.reduce_sum(final_points_in_k)) 405 self._alpha_op = self._alpha.assign(final_points_in_k / 406 (num_examples + MEPS)) 407 else: 408 self._alpha_op = control_flow_ops.no_op() 409 self._train_ops = [self._alpha_op] 410 411 # Update means. 412 points_in_k_expanded = array_ops.reshape(points_in_k, 413 [self._num_classes, 1, 1]) 414 if 'm' in self._params: 415 self._means_op = self._means.assign( 416 math_ops.div( 417 math_ops.add_n(self._w_mul_x), points_in_k_expanded + MEPS)) 418 else: 419 self._means_op = control_flow_ops.no_op() 420 # means are (num_classes x 1 x dims) 421 422 # Update covariances. 423 with ops.control_dependencies([self._means_op]): 424 b = math_ops.add_n(self._w_mul_x2) / (points_in_k_expanded + MEPS) 425 new_covs = [] 426 for k in range(self._num_classes): 427 mean = self._means.value()[k, :, :] 428 square_mean = math_ops.matmul(mean, mean, transpose_a=True) 429 new_cov = b[k, :, :] - square_mean + self._min_var 430 if self._covariance_type == FULL_COVARIANCE: 431 new_covs.append(array_ops.expand_dims(new_cov, 0)) 432 elif self._covariance_type == DIAG_COVARIANCE: 433 new_covs.append( 434 array_ops.expand_dims(array_ops.diag_part(new_cov), 0)) 435 new_covs = array_ops.concat(new_covs, 0) 436 if 'c' in self._params: 437 # Train operations don't need to take care of the means 438 # because covariances already depend on it. 439 with ops.control_dependencies([self._means_op, new_covs]): 440 self._train_ops.append( 441 state_ops.assign( 442 self._covs, new_covs, validate_shape=False)) 443 444 def _define_loglikelihood_operation(self): 445 """Defines the total log-likelihood of current iteration.""" 446 op = [] 447 for prior_probs in self._prior_probs: 448 op.append(math_ops.reduce_logsumexp(prior_probs)) 449 self._log_likelihood_op = math_ops.reduce_logsumexp(op) 450 451 def _define_score_samples(self): 452 """Defines the likelihood of each data sample.""" 453 op = [] 454 for shard_id, prior_probs in enumerate(self._prior_probs): 455 op.append(prior_probs + math_ops.log(self._w[shard_id])) 456 self._scores = array_ops.squeeze( 457 math_ops.reduce_logsumexp(op, axis=2, keep_dims=True), axis=0) 458 459 460def gmm(inp, 461 initial_clusters, 462 num_clusters, 463 random_seed, 464 covariance_type=FULL_COVARIANCE, 465 params='wmc'): 466 """Creates the graph for Gaussian mixture model (GMM) clustering. 467 468 Args: 469 inp: An input tensor or list of input tensors 470 initial_clusters: Specifies the clusters used during 471 initialization. Can be a tensor or numpy array, or a function 472 that generates the clusters. Can also be "random" to specify 473 that clusters should be chosen randomly from input data. Note: type 474 is diverse to be consistent with skflow. 475 num_clusters: number of clusters. 476 random_seed: Python integer. Seed for PRNG used to initialize centers. 477 covariance_type: one of "diag", "full". 478 params: Controls which parameters are updated in the training 479 process. Can contain any combination of "w" for weights, "m" for 480 means, and "c" for covars. 481 482 Returns: 483 Note: tuple of lists returned to be consistent with skflow 484 A tuple consisting of: 485 assignments: A vector (or list of vectors). Each element in the vector 486 corresponds to an input row in 'inp' and specifies the cluster id 487 corresponding to the input. 488 training_op: an op that runs an iteration of training. 489 init_op: an op that runs the initialization. 490 """ 491 initial_means = None 492 if initial_clusters != 'random' and not isinstance(initial_clusters, 493 ops.Tensor): 494 initial_means = constant_op.constant(initial_clusters, dtype=dtypes.float32) 495 496 # Implementation of GMM. 497 inp = inp if isinstance(inp, list) else [inp] 498 gmm_tool = GmmAlgorithm(inp, num_clusters, initial_means, params, 499 covariance_type, random_seed) 500 assignments = gmm_tool.assignments() 501 scores = gmm_tool.scores() 502 loss = gmm_tool.log_likelihood_op() 503 return (loss, scores, [assignments], gmm_tool.training_ops(), 504 gmm_tool.init_ops(), gmm_tool.is_initialized()) 505