1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Clustering Operations.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21from tensorflow.python.framework import constant_op 22from tensorflow.python.framework import dtypes 23from tensorflow.python.framework import ops 24from tensorflow.python.framework import random_seed as random_seed_ops 25from tensorflow.python.ops import array_ops 26from tensorflow.python.ops import check_ops 27from tensorflow.python.ops import control_flow_ops 28from tensorflow.python.ops import gen_clustering_ops 29from tensorflow.python.ops import math_ops 30from tensorflow.python.ops import nn_impl 31from tensorflow.python.ops import random_ops 32from tensorflow.python.ops import state_ops 33from tensorflow.python.ops import variable_scope 34from tensorflow.python.ops.embedding_ops import embedding_lookup 35# go/tf-wildcard-import 36# pylint: disable=wildcard-import 37from tensorflow.python.ops.gen_clustering_ops import * 38# pylint: enable=wildcard-import 39 40# Euclidean distance between vectors U and V is defined as \\(||U - V||_F\\) 41# which is the square root of the sum of the absolute squares of the elements 42# difference. 43SQUARED_EUCLIDEAN_DISTANCE = 'squared_euclidean' 44# Cosine distance between vectors U and V is defined as 45# \\(1 - (U \dot V) / (||U||_F ||V||_F)\\) 46COSINE_DISTANCE = 'cosine' 47 48RANDOM_INIT = 'random' 49KMEANS_PLUS_PLUS_INIT = 'kmeans_plus_plus' 50KMC2_INIT = 'kmc2' 51 52# The name of the variable holding the cluster centers. Used by the Estimator. 53CLUSTERS_VAR_NAME = 'clusters' 54 55 56class KMeans(object): 57 """Creates the graph for k-means clustering.""" 58 59 def __init__(self, 60 inputs, 61 num_clusters, 62 initial_clusters=RANDOM_INIT, 63 distance_metric=SQUARED_EUCLIDEAN_DISTANCE, 64 use_mini_batch=False, 65 mini_batch_steps_per_iteration=1, 66 random_seed=0, 67 kmeans_plus_plus_num_retries=2, 68 kmc2_chain_length=200): 69 """Creates an object for generating KMeans clustering graph. 70 71 This class implements the following variants of K-means algorithm: 72 73 If use_mini_batch is False, it runs standard full batch K-means. Each step 74 runs a single iteration of K-Means. This step can be run sharded across 75 multiple workers by passing a list of sharded inputs to this class. Note 76 however that a single step needs to process the full input at once. 77 78 If use_mini_batch is True, it runs a generalization of the mini-batch 79 K-means algorithm. It runs multiple iterations, where each iteration is 80 composed of mini_batch_steps_per_iteration steps. Two copies of cluster 81 centers are maintained: one that is updated at the end of each iteration, 82 and one that is updated every step. The first copy is used to compute 83 cluster allocations for each step, and for inference, while the second copy 84 is the one updated each step using the mini-batch update rule. After each 85 iteration is complete, this second copy is copied back the first copy. 86 87 Note that for use_mini_batch=True, when mini_batch_steps_per_iteration=1, 88 the algorithm reduces to the standard mini-batch algorithm. Also by setting 89 mini_batch_steps_per_iteration = num_inputs / batch_size, the algorithm 90 becomes an asynchronous version of the full-batch algorithm. Note however 91 that there is no guarantee by this implementation that each input is seen 92 exactly once per iteration. Also, different updates are applied 93 asynchronously without locking. So this asynchronous version may not behave 94 exactly like a full-batch version. 95 96 Args: 97 inputs: An input tensor or list of input tensors. It is assumed that the 98 data points have been previously randomly permuted. 99 num_clusters: An integer tensor specifying the number of clusters. This 100 argument is ignored if initial_clusters is a tensor or numpy array. 101 initial_clusters: Specifies the clusters used during initialization. One 102 of the following: 103 - a tensor or numpy array with the initial cluster centers. 104 - a function f(inputs, k) that returns up to k centers from `inputs`. 105 - "random": Choose centers randomly from `inputs`. 106 - "kmeans_plus_plus": Use kmeans++ to choose centers from `inputs`. 107 - "kmc2": Use the fast k-MC2 algorithm to choose centers from `inputs`. 108 In the last three cases, one batch of `inputs` may not yield 109 `num_clusters` centers, in which case initialization will require 110 multiple batches until enough centers are chosen. In the case of 111 "random" or "kmeans_plus_plus", if the input size is <= `num_clusters` 112 then the entire batch is chosen to be cluster centers. 113 distance_metric: Distance metric used for clustering. Supported options: 114 "squared_euclidean", "cosine". 115 use_mini_batch: If true, use the mini-batch k-means algorithm. Else assume 116 full batch. 117 mini_batch_steps_per_iteration: Number of steps after which the updated 118 cluster centers are synced back to a master copy. 119 random_seed: Seed for PRNG used to initialize seeds. 120 kmeans_plus_plus_num_retries: For each point that is sampled during 121 kmeans++ initialization, this parameter specifies the number of 122 additional points to draw from the current distribution before selecting 123 the best. If a negative value is specified, a heuristic is used to 124 sample O(log(num_to_sample)) additional points. 125 kmc2_chain_length: Determines how many candidate points are used by the 126 k-MC2 algorithm to produce one new cluster centers. If a (mini-)batch 127 contains less points, one new cluster center is generated from the 128 (mini-)batch. 129 130 Raises: 131 ValueError: An invalid argument was passed to initial_clusters or 132 distance_metric. 133 """ 134 if isinstance(initial_clusters, str) and initial_clusters not in [ 135 RANDOM_INIT, KMEANS_PLUS_PLUS_INIT, KMC2_INIT 136 ]: 137 raise ValueError( 138 "Unsupported initialization algorithm '%s'" % initial_clusters) 139 if distance_metric not in [SQUARED_EUCLIDEAN_DISTANCE, COSINE_DISTANCE]: 140 raise ValueError("Unsupported distance metric '%s'" % distance_metric) 141 self._inputs = inputs if isinstance(inputs, list) else [inputs] 142 self._num_clusters = num_clusters 143 self._initial_clusters = initial_clusters 144 self._distance_metric = distance_metric 145 self._use_mini_batch = use_mini_batch 146 self._mini_batch_steps_per_iteration = int(mini_batch_steps_per_iteration) 147 self._seed = random_seed_ops.get_seed(random_seed)[0] 148 self._kmeans_plus_plus_num_retries = kmeans_plus_plus_num_retries 149 self._kmc2_chain_length = kmc2_chain_length 150 151 @classmethod 152 def _distance_graph(cls, inputs, clusters, distance_metric): 153 """Computes distance between each input and each cluster center. 154 155 Args: 156 inputs: list of input Tensors. 157 clusters: cluster Tensor. 158 distance_metric: distance metric used for clustering 159 160 Returns: 161 list of Tensors, where each element corresponds to each element in inputs. 162 The value is the distance of each row to all the cluster centers. 163 Currently only Euclidean distance and cosine distance are supported. 164 """ 165 assert isinstance(inputs, list) 166 if distance_metric == SQUARED_EUCLIDEAN_DISTANCE: 167 return cls._compute_euclidean_distance(inputs, clusters) 168 elif distance_metric == COSINE_DISTANCE: 169 return cls._compute_cosine_distance( 170 inputs, clusters, inputs_normalized=True) 171 else: 172 assert False, str(distance_metric) 173 174 @classmethod 175 def _compute_euclidean_distance(cls, inputs, clusters): 176 """Computes Euclidean distance between each input and each cluster center. 177 178 Args: 179 inputs: list of input Tensors. 180 clusters: cluster Tensor. 181 182 Returns: 183 list of Tensors, where each element corresponds to each element in inputs. 184 The value is the distance of each row to all the cluster centers. 185 """ 186 output = [] 187 for inp in inputs: 188 with ops.colocate_with(inp, ignore_existing=True): 189 # Computes Euclidean distance. Note the first and third terms are 190 # broadcast additions. 191 squared_distance = ( 192 math_ops.reduce_sum(math_ops.square(inp), 1, keepdims=True) - 193 2 * math_ops.matmul(inp, clusters, transpose_b=True) + 194 array_ops.transpose( 195 math_ops.reduce_sum( 196 math_ops.square(clusters), 1, keepdims=True))) 197 output.append(squared_distance) 198 199 return output 200 201 @classmethod 202 def _compute_cosine_distance(cls, inputs, clusters, inputs_normalized=True): 203 """Computes cosine distance between each input and each cluster center. 204 205 Args: 206 inputs: list of input Tensor. 207 clusters: cluster Tensor 208 inputs_normalized: if True, it assumes that inp and clusters are 209 normalized and computes the dot product which is equivalent to the cosine 210 distance. Else it L2 normalizes the inputs first. 211 212 Returns: 213 list of Tensors, where each element corresponds to each element in inp. 214 The value is the distance of each row to all the cluster centers. 215 """ 216 output = [] 217 if not inputs_normalized: 218 with ops.colocate_with(clusters, ignore_existing=True): 219 clusters = nn_impl.l2_normalize(clusters, axis=1) 220 for inp in inputs: 221 with ops.colocate_with(inp, ignore_existing=True): 222 if not inputs_normalized: 223 inp = nn_impl.l2_normalize(inp, axis=1) 224 output.append(1 - math_ops.matmul(inp, clusters, transpose_b=True)) 225 return output 226 227 def _infer_graph(self, inputs, clusters): 228 """Maps input to closest cluster and the score. 229 230 Args: 231 inputs: list of input Tensors. 232 clusters: Tensor of cluster centers. 233 234 Returns: 235 List of tuple, where each value in tuple corresponds to a value in inp. 236 The tuple has following three elements: 237 all_scores: distance of each input to each cluster center. 238 score: distance of each input to closest cluster center. 239 cluster_idx: index of cluster center closest to the corresponding input. 240 """ 241 assert isinstance(inputs, list) 242 # Pairwise distances are used only by transform(). In all other cases, this 243 # sub-graph is not evaluated. 244 scores = self._distance_graph(inputs, clusters, self._distance_metric) 245 output = [] 246 if (self._distance_metric == COSINE_DISTANCE and 247 not self._clusters_l2_normalized()): 248 # The cosine distance between normalized vectors x and y is the same as 249 # 2 * squared_euclidean_distance. We are using this fact and reusing the 250 # nearest_neighbors op. 251 # TODO(ands): Support COSINE distance in nearest_neighbors and remove 252 # this. 253 with ops.colocate_with(clusters, ignore_existing=True): 254 clusters = nn_impl.l2_normalize(clusters, axis=1) 255 for inp, score in zip(inputs, scores): 256 with ops.colocate_with(inp, ignore_existing=True): 257 (indices, distances) = gen_clustering_ops.nearest_neighbors( 258 inp, clusters, 1) 259 if self._distance_metric == COSINE_DISTANCE: 260 distances *= 0.5 261 output.append((score, array_ops.squeeze(distances, [-1]), 262 array_ops.squeeze(indices, [-1]))) 263 return zip(*output) 264 265 def _clusters_l2_normalized(self): 266 """Returns True if clusters centers are kept normalized.""" 267 return (self._distance_metric == COSINE_DISTANCE and 268 (not self._use_mini_batch or 269 self._mini_batch_steps_per_iteration > 1)) 270 271 def _create_variables(self, num_clusters): 272 """Creates variables. 273 274 Args: 275 num_clusters: an integer Tensor providing the number of clusters. 276 277 Returns: 278 Tuple with following elements: 279 - cluster_centers: a Tensor for storing cluster centers 280 - cluster_centers_initialized: bool Variable indicating whether clusters 281 are initialized. 282 - cluster_counts: a Tensor for storing counts of points assigned to this 283 cluster. This is used by mini-batch training. 284 - cluster_centers_updated: Tensor representing copy of cluster centers 285 that are updated every step. 286 - update_in_steps: numbers of steps left before we sync 287 cluster_centers_updated back to cluster_centers. 288 """ 289 init_value = array_ops.placeholder_with_default([], shape=None) 290 cluster_centers = variable_scope.variable( 291 init_value, name=CLUSTERS_VAR_NAME, validate_shape=False) 292 cluster_centers_initialized = variable_scope.variable( 293 False, dtype=dtypes.bool, name='initialized') 294 295 if self._use_mini_batch and self._mini_batch_steps_per_iteration > 1: 296 # Copy of cluster centers actively updated each step according to 297 # mini-batch update rule. 298 cluster_centers_updated = variable_scope.variable( 299 init_value, name='clusters_updated', validate_shape=False) 300 # How many steps till we copy the updated clusters to cluster_centers. 301 update_in_steps = variable_scope.variable( 302 self._mini_batch_steps_per_iteration, 303 dtype=dtypes.int64, 304 name='update_in_steps') 305 # Count of points assigned to cluster_centers_updated. 306 cluster_counts = variable_scope.variable( 307 array_ops.zeros([num_clusters], dtype=dtypes.int64)) 308 else: 309 cluster_centers_updated = cluster_centers 310 update_in_steps = None 311 cluster_counts = ( 312 variable_scope.variable( 313 array_ops.ones([num_clusters], dtype=dtypes.int64)) 314 if self._use_mini_batch else None) 315 return (cluster_centers, cluster_centers_initialized, cluster_counts, 316 cluster_centers_updated, update_in_steps) 317 318 @classmethod 319 def _l2_normalize_data(cls, inputs): 320 """Normalized the input data.""" 321 output = [] 322 for inp in inputs: 323 with ops.colocate_with(inp, ignore_existing=True): 324 output.append(nn_impl.l2_normalize(inp, dim=1)) 325 return output 326 327 def training_graph(self): 328 """Generate a training graph for kmeans algorithm. 329 330 This returns, among other things, an op that chooses initial centers 331 (init_op), a boolean variable that is set to True when the initial centers 332 are chosen (cluster_centers_initialized), and an op to perform either an 333 entire Lloyd iteration or a mini-batch of a Lloyd iteration (training_op). 334 The caller should use these components as follows. A single worker should 335 execute init_op multiple times until cluster_centers_initialized becomes 336 True. Then multiple workers may execute training_op any number of times. 337 338 Returns: 339 A tuple consisting of: 340 all_scores: A matrix (or list of matrices) of dimensions (num_input, 341 num_clusters) where the value is the distance of an input vector and a 342 cluster center. 343 cluster_idx: A vector (or list of vectors). Each element in the vector 344 corresponds to an input row in 'inp' and specifies the cluster id 345 corresponding to the input. 346 scores: Similar to cluster_idx but specifies the distance to the 347 assigned cluster instead. 348 cluster_centers_initialized: scalar indicating whether clusters have been 349 initialized. 350 init_op: an op to initialize the clusters. 351 training_op: an op that runs an iteration of training. 352 """ 353 # Implementation of kmeans. 354 if (isinstance(self._initial_clusters, str) or 355 callable(self._initial_clusters)): 356 initial_clusters = self._initial_clusters 357 num_clusters = ops.convert_to_tensor(self._num_clusters) 358 else: 359 initial_clusters = ops.convert_to_tensor(self._initial_clusters) 360 num_clusters = array_ops.shape(initial_clusters)[0] 361 362 inputs = self._inputs 363 (cluster_centers_var, cluster_centers_initialized, total_counts, 364 cluster_centers_updated, 365 update_in_steps) = self._create_variables(num_clusters) 366 init_op = _InitializeClustersOpFactory( 367 self._inputs, num_clusters, initial_clusters, self._distance_metric, 368 self._seed, self._kmeans_plus_plus_num_retries, self._kmc2_chain_length, 369 cluster_centers_var, cluster_centers_updated, 370 cluster_centers_initialized).op() 371 cluster_centers = cluster_centers_var 372 373 if self._distance_metric == COSINE_DISTANCE: 374 inputs = self._l2_normalize_data(inputs) 375 if not self._clusters_l2_normalized(): 376 cluster_centers = nn_impl.l2_normalize(cluster_centers, dim=1) 377 378 all_scores, scores, cluster_idx = self._infer_graph(inputs, cluster_centers) 379 if self._use_mini_batch: 380 sync_updates_op = self._mini_batch_sync_updates_op( 381 update_in_steps, cluster_centers_var, cluster_centers_updated, 382 total_counts) 383 assert sync_updates_op is not None 384 with ops.control_dependencies([sync_updates_op]): 385 training_op = self._mini_batch_training_op( 386 inputs, cluster_idx, cluster_centers_updated, total_counts) 387 else: 388 assert cluster_centers == cluster_centers_var 389 training_op = self._full_batch_training_op( 390 inputs, num_clusters, cluster_idx, cluster_centers_var) 391 392 return (all_scores, cluster_idx, scores, cluster_centers_initialized, 393 init_op, training_op) 394 395 def _mini_batch_sync_updates_op(self, update_in_steps, cluster_centers_var, 396 cluster_centers_updated, total_counts): 397 if self._use_mini_batch and self._mini_batch_steps_per_iteration > 1: 398 assert update_in_steps is not None 399 with ops.colocate_with(update_in_steps, ignore_existing=True): 400 401 def _f(): 402 # Note that there is a race condition here, so we do a best effort 403 # updates here. We reset update_in_steps first so that other workers 404 # don't duplicate the updates. Also we update cluster_center_vars 405 # before resetting total_counts to avoid large updates to 406 # cluster_centers_updated based on partially updated 407 # cluster_center_vars. 408 with ops.control_dependencies([ 409 state_ops.assign(update_in_steps, 410 self._mini_batch_steps_per_iteration - 1) 411 ]): 412 with ops.colocate_with( 413 cluster_centers_updated, ignore_existing=True): 414 if self._distance_metric == COSINE_DISTANCE: 415 cluster_centers = nn_impl.l2_normalize( 416 cluster_centers_updated, dim=1) 417 else: 418 cluster_centers = cluster_centers_updated 419 with ops.colocate_with(cluster_centers_var, ignore_existing=True): 420 with ops.control_dependencies( 421 [state_ops.assign(cluster_centers_var, cluster_centers)]): 422 with ops.colocate_with(None, ignore_existing=True): 423 with ops.control_dependencies([ 424 state_ops.assign(total_counts, 425 array_ops.zeros_like(total_counts)) 426 ]): 427 return array_ops.identity(update_in_steps) 428 429 return control_flow_ops.cond( 430 update_in_steps <= 0, _f, 431 lambda: state_ops.assign_sub(update_in_steps, 1)) 432 else: 433 return control_flow_ops.no_op() 434 435 def _mini_batch_training_op(self, inputs, cluster_idx_list, cluster_centers, 436 total_counts): 437 """Creates an op for training for mini batch case. 438 439 Args: 440 inputs: list of input Tensors. 441 cluster_idx_list: A vector (or list of vectors). Each element in the 442 vector corresponds to an input row in 'inp' and specifies the cluster id 443 corresponding to the input. 444 cluster_centers: Tensor Ref of cluster centers. 445 total_counts: Tensor Ref of cluster counts. 446 447 Returns: 448 An op for doing an update of mini-batch k-means. 449 """ 450 update_ops = [] 451 for inp, cluster_idx in zip(inputs, cluster_idx_list): 452 with ops.colocate_with(inp, ignore_existing=True): 453 assert total_counts is not None 454 cluster_idx = array_ops.reshape(cluster_idx, [-1]) 455 # Dedupe the unique ids of cluster_centers being updated so that updates 456 # can be locally aggregated. 457 unique_ids, unique_idx = array_ops.unique(cluster_idx) 458 num_unique_cluster_idx = array_ops.size(unique_ids) 459 # Fetch the old values of counts and cluster_centers. 460 with ops.colocate_with(total_counts, ignore_existing=True): 461 old_counts = array_ops.gather(total_counts, unique_ids) 462 # TODO(agarwal): This colocation seems to run into problems. Fix it. 463 with ops.colocate_with(cluster_centers, ignore_existing=True): 464 old_cluster_centers = array_ops.gather(cluster_centers, unique_ids) 465 # Locally aggregate the increment to counts. 466 count_updates = math_ops.unsorted_segment_sum( 467 array_ops.ones_like(unique_idx, dtype=total_counts.dtype), 468 unique_idx, num_unique_cluster_idx) 469 # Locally compute the sum of inputs mapped to each id. 470 # For a cluster with old cluster value x, old count n, and with data 471 # d_1,...d_k newly assigned to it, we recompute the new value as 472 # \\(x += (sum_i(d_i) - k * x) / (n + k)\\). 473 # Compute \\(sum_i(d_i)\\), see comment above. 474 cluster_center_updates = math_ops.unsorted_segment_sum( 475 inp, unique_idx, num_unique_cluster_idx) 476 # Shape to enable broadcasting count_updates and learning_rate to inp. 477 # It extends the shape with 1's to match the rank of inp. 478 broadcast_shape = array_ops.concat([ 479 array_ops.reshape(num_unique_cluster_idx, [1]), 480 array_ops.ones( 481 array_ops.reshape(array_ops.rank(inp) - 1, [1]), 482 dtype=dtypes.int32) 483 ], 0) 484 # Subtract k * x, see comment above. 485 cluster_center_updates -= math_ops.cast( 486 array_ops.reshape(count_updates, broadcast_shape), 487 inp.dtype) * old_cluster_centers 488 learning_rate = math_ops.reciprocal( 489 math_ops.cast(old_counts + count_updates, inp.dtype)) 490 learning_rate = array_ops.reshape(learning_rate, broadcast_shape) 491 # scale by 1 / (n + k), see comment above. 492 cluster_center_updates *= learning_rate 493 # Apply the updates. 494 update_counts = state_ops.scatter_add(total_counts, unique_ids, 495 count_updates) 496 update_cluster_centers = state_ops.scatter_add( 497 cluster_centers, unique_ids, cluster_center_updates) 498 update_ops.extend([update_counts, update_cluster_centers]) 499 return control_flow_ops.group(*update_ops) 500 501 def _full_batch_training_op(self, inputs, num_clusters, cluster_idx_list, 502 cluster_centers): 503 """Creates an op for training for full batch case. 504 505 Args: 506 inputs: list of input Tensors. 507 num_clusters: an integer Tensor providing the number of clusters. 508 cluster_idx_list: A vector (or list of vectors). Each element in the 509 vector corresponds to an input row in 'inp' and specifies the cluster id 510 corresponding to the input. 511 cluster_centers: Tensor Ref of cluster centers. 512 513 Returns: 514 An op for doing an update of mini-batch k-means. 515 """ 516 cluster_sums = [] 517 cluster_counts = [] 518 epsilon = constant_op.constant(1e-6, dtype=inputs[0].dtype) 519 for inp, cluster_idx in zip(inputs, cluster_idx_list): 520 with ops.colocate_with(inp, ignore_existing=True): 521 cluster_sums.append( 522 math_ops.unsorted_segment_sum(inp, cluster_idx, num_clusters)) 523 cluster_counts.append( 524 math_ops.unsorted_segment_sum( 525 array_ops.reshape( 526 array_ops.ones( 527 array_ops.reshape(array_ops.shape(inp)[0], [-1])), 528 [-1, 1]), cluster_idx, num_clusters)) 529 with ops.colocate_with(cluster_centers, ignore_existing=True): 530 new_clusters_centers = math_ops.add_n(cluster_sums) / ( 531 math_ops.cast(math_ops.add_n(cluster_counts), cluster_sums[0].dtype) + 532 epsilon) 533 if self._clusters_l2_normalized(): 534 new_clusters_centers = nn_impl.l2_normalize(new_clusters_centers, dim=1) 535 return state_ops.assign(cluster_centers, new_clusters_centers) 536 537 538class _InitializeClustersOpFactory(object): 539 """Internal class to create the op to initialize the clusters. 540 541 The op performs this algorithm (see constructor args): 542 543 num_remaining = num_clusters - length(cluster_centers) 544 if num_remaining == 0: 545 assert that cluster_centers_initialized is true 546 else: 547 assert that num_remaining > 0 548 new_centers = choose up to num_remaining initial centers 549 l2-normalize new_centers if using cosine distance 550 all_centers = concat(cluster_centers, new_centers) 551 cluster_centers := all_centers 552 if there is a cluster_centers_updated variable: 553 cluster_centers_updated := cluster_centers 554 num_now_remaining = num_clusters - length(cluster_centers) 555 if num_now_remaining == 0: 556 cluster_centers_initialized := true 557 """ 558 559 # TODO(ccolby): Refactor this class so that kmc2 isn't so much a special case. 560 561 def __init__(self, inputs, num_clusters, initial_clusters, distance_metric, 562 random_seed, kmeans_plus_plus_num_retries, kmc2_chain_length, 563 cluster_centers, cluster_centers_updated, 564 cluster_centers_initialized): 565 """Creates an op factory. 566 567 Args: 568 inputs: See KMeans constructor. 569 num_clusters: An integer Tensor providing the number of clusters. 570 initial_clusters: See KMeans constructor. 571 distance_metric: See KMeans constructor. 572 random_seed: See KMeans constructor. 573 kmeans_plus_plus_num_retries: See KMeans constructor. 574 kmc2_chain_length: See KMeans constructor. 575 cluster_centers: The TF variable holding the initial centers. It may 576 already contain some centers when the op is executed. 577 cluster_centers_updated: A second TF variable to hold a copy of the 578 initial centers, used for full-batch mode. In mini-batch mode, 579 cluster_centers_updated is the same variable as cluster_centers. 580 cluster_centers_initialized: A boolean TF variable that will be set 581 to true when all the initial centers have been chosen. 582 """ 583 # All of these instance variables are constants. 584 self._inputs = inputs 585 self._num_clusters = num_clusters 586 self._initial_clusters = initial_clusters 587 self._distance_metric = distance_metric 588 self._seed = random_seed 589 self._kmeans_plus_plus_num_retries = kmeans_plus_plus_num_retries 590 self._kmc2_chain_length = kmc2_chain_length 591 self._cluster_centers = cluster_centers 592 self._cluster_centers_updated = cluster_centers_updated 593 self._cluster_centers_initialized = cluster_centers_initialized 594 595 self._num_selected = array_ops.shape(self._cluster_centers)[0] 596 self._num_remaining = self._num_clusters - self._num_selected 597 self._num_data = math_ops.add_n( 598 [array_ops.shape(i)[0] for i in self._inputs]) 599 600 def _random(self): 601 indices = random_ops.random_uniform( 602 array_ops.reshape(self._num_remaining, [-1]), 603 minval=0, 604 maxval=math_ops.cast(self._num_data, dtypes.int64), 605 seed=self._seed, 606 dtype=dtypes.int64) 607 return embedding_lookup(self._inputs, indices, partition_strategy='div') 608 609 def _kmeans_plus_plus(self): 610 # Points from only the first shard are used for initializing centers. 611 # TODO(ands): Use all points. 612 inp = self._inputs[0] 613 if self._distance_metric == COSINE_DISTANCE: 614 inp = nn_impl.l2_normalize(inp, dim=1) 615 return gen_clustering_ops.kmeans_plus_plus_initialization( 616 inp, math_ops.cast(self._num_remaining, dtypes.int64), self._seed, 617 self._kmeans_plus_plus_num_retries) 618 619 def _kmc2_multiple_centers(self): 620 """Adds new initial cluster centers using the k-MC2 algorithm. 621 622 In each call to the op, the provided batch is split into subsets based on 623 the specified `kmc2_chain_length`. On each subset, a single Markov chain of 624 the k-MC2 algorithm is used to add *one* new center cluster center. If there 625 are less than `kmc2_chain_length` points in the subset, a single center is 626 added using one Markov chain on the full input. It is assumed that the 627 provided batch has previously been randomly permuted. Otherwise, k-MC2 may 628 return suboptimal centers. 629 630 Returns: 631 An op that adds new cluster centers. 632 """ 633 # The op only operates on the first shard of data. 634 first_shard = self._inputs[0] 635 # Number of points in the input that can be used. 636 batch_size = array_ops.shape(first_shard)[0] 637 # Maximum number of subsets such that the size of each subset is at least 638 # `kmc2_chain_length`. Final subsets may be larger. 639 max_to_sample = math_ops.cast( 640 batch_size / self._kmc2_chain_length, dtype=dtypes.int32) 641 # We sample at least one new center and at most all remaining centers. 642 num_to_sample = math_ops.maximum( 643 math_ops.minimum(self._num_remaining, max_to_sample), 1) 644 645 def _cond(i, _): 646 """Stopping condition for the while loop.""" 647 return math_ops.less(i, num_to_sample) 648 649 def _body(i, _): 650 """Body that adds a single new center based on a subset.""" 651 652 def _sample_random(): 653 """Returns a random point as a cluster center.""" 654 # By assumption the batch is reshuffled and _sample_random is always 655 # called for i=0. Hence, we simply return the first point. 656 new_center = array_ops.reshape(first_shard[0], [1, -1]) 657 if self._distance_metric == COSINE_DISTANCE: 658 new_center = nn_impl.l2_normalize(new_center, dim=1) 659 return new_center 660 661 def _sample_kmc2_chain(): 662 """Returns previous centers as well as a new center sampled using k-MC2. 663 """ 664 # Extract the subset from the underlying batch. 665 start = i * self._kmc2_chain_length 666 end = start + self._kmc2_chain_length 667 subset = first_shard[start:end] 668 # Compute the distances from points in the subset to previous centers. 669 _, distances = gen_clustering_ops.nearest_neighbors( 670 subset, self._cluster_centers, 1) 671 # Sample index of new center using k-MC2 Markov chain. 672 new_center_index = gen_clustering_ops.kmc2_chain_initialization( 673 array_ops.squeeze(distances), self._seed) 674 # Extract actual new center. 675 newly_sampled_center = array_ops.reshape(subset[new_center_index], 676 [1, -1]) 677 # Return concatenation with previously sampled centers. 678 if self._distance_metric == COSINE_DISTANCE: 679 newly_sampled_center = nn_impl.l2_normalize( 680 newly_sampled_center, dim=1) 681 return array_ops.concat([self._cluster_centers, newly_sampled_center], 682 0) 683 684 # Obtain a random point if there are no previously sampled centers. 685 # Otherwise, construct a k-MC2 Markov chain. 686 new_centers = control_flow_ops.cond( 687 math_ops.equal(self._num_selected, 0), _sample_random, 688 _sample_kmc2_chain) 689 # Assign new cluster centers to underlying variable. 690 assigned_centers = state_ops.assign( 691 self._cluster_centers, new_centers, validate_shape=False) 692 if self._cluster_centers_updated is not self._cluster_centers: 693 assigned_centers = state_ops.assign( 694 self._cluster_centers_updated, 695 assigned_centers, 696 validate_shape=False) 697 return i + 1, self._num_clusters - array_ops.shape(assigned_centers)[0] 698 699 # Add num_to_sample new data points. 700 _, num_remaining = control_flow_ops.while_loop(_cond, _body, [0, 0]) 701 return num_remaining 702 703 def _greedy_batch_sampler(self, sampler): 704 # If the input dataset size is smaller than the number of centers 705 # remaining, choose the entire input dataset as centers. This can happen 706 # with mini-batch. Otherwise, sample the batch according to the provided 707 # sampler. 708 return control_flow_ops.cond(self._num_data <= self._num_remaining, 709 lambda: array_ops.concat(self._inputs, 0), 710 sampler) 711 712 def _single_batch_sampler(self, sampler): 713 # Enforce that there are at least as many data points as centers 714 # remaining. This gives the provided sampler the chance to select all 715 # remaining centers from a single batch. 716 with ops.control_dependencies( 717 [check_ops.assert_greater_equal(self._num_data, self._num_remaining)]): 718 return sampler() 719 720 def _choose_initial_centers(self): 721 if isinstance(self._initial_clusters, str): 722 if self._initial_clusters == RANDOM_INIT: 723 return self._greedy_batch_sampler(self._random) 724 else: # self._initial_clusters == KMEANS_PLUS_PLUS_INIT 725 return self._single_batch_sampler(self._kmeans_plus_plus) 726 elif callable(self._initial_clusters): 727 return self._initial_clusters(self._inputs, self._num_remaining) 728 else: 729 with ops.control_dependencies([ 730 check_ops.assert_equal(self._num_remaining, 731 array_ops.shape(self._initial_clusters)[0]) 732 ]): 733 return self._initial_clusters 734 735 def _add_new_centers(self): 736 """Adds some centers and returns the number of centers remaining.""" 737 new_centers = self._choose_initial_centers() 738 if self._distance_metric == COSINE_DISTANCE: 739 new_centers = nn_impl.l2_normalize(new_centers, dim=1) 740 # If cluster_centers is empty, it doesn't have the right shape for concat. 741 all_centers = control_flow_ops.cond( 742 math_ops.equal(self._num_selected, 0), lambda: new_centers, 743 lambda: array_ops.concat([self._cluster_centers, new_centers], 0)) 744 # TODO(ccolby): De-dupe all_centers? 745 a = state_ops.assign( 746 self._cluster_centers, all_centers, validate_shape=False) 747 if self._cluster_centers_updated is not self._cluster_centers: 748 a = state_ops.assign( 749 self._cluster_centers_updated, a, validate_shape=False) 750 return self._num_clusters - array_ops.shape(a)[0] 751 752 def _initialize(self): 753 with ops.control_dependencies([ 754 check_ops.assert_positive(self._num_remaining), 755 ]): 756 if self._initial_clusters == KMC2_INIT: 757 num_now_remaining = self._kmc2_multiple_centers() 758 else: 759 num_now_remaining = self._add_new_centers() 760 return control_flow_ops.cond( 761 math_ops.equal(num_now_remaining, 0), 762 lambda: state_ops.assign(self._cluster_centers_initialized, True), 763 control_flow_ops.no_op) 764 765 def op(self): 766 """Returns the cluster initializer op.""" 767 return control_flow_ops.cond( 768 math_ops.equal(self._num_remaining, 0), 769 lambda: check_ops.assert_equal(self._cluster_centers_initialized, True), 770 self._initialize) 771