1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Keras discretization preprocessing layer.""" 16# pylint: disable=g-classes-have-attributes 17 18import numpy as np 19 20from tensorflow.python.framework import dtypes 21from tensorflow.python.framework import ops 22from tensorflow.python.framework import sparse_tensor 23from tensorflow.python.framework import tensor_spec 24from tensorflow.python.framework import tensor_util 25from tensorflow.python.keras.engine import base_preprocessing_layer 26from tensorflow.python.keras.utils import tf_utils 27from tensorflow.python.ops import array_ops 28from tensorflow.python.ops import gen_math_ops 29from tensorflow.python.ops import math_ops 30from tensorflow.python.ops import script_ops 31from tensorflow.python.ops import sort_ops 32from tensorflow.python.ops.ragged import ragged_functional_ops 33from tensorflow.python.platform import tf_logging as logging 34from tensorflow.python.util.tf_export import keras_export 35 36 37def summarize(values, epsilon): 38 """Reduce a 1D sequence of values to a summary. 39 40 This algorithm is based on numpy.quantiles but modified to allow for 41 intermediate steps between multiple data sets. It first finds the target 42 number of bins as the reciprocal of epsilon and then takes the individual 43 values spaced at appropriate intervals to arrive at that target. 44 The final step is to return the corresponding counts between those values 45 If the target num_bins is larger than the size of values, the whole array is 46 returned (with weights of 1). 47 48 Args: 49 values: 1-D `np.ndarray` to be summarized. 50 epsilon: A `'float32'` that determines the approxmiate desired precision. 51 52 Returns: 53 A 2-D `np.ndarray` that is a summary of the inputs. First column is the 54 interpolated partition values, the second is the weights (counts). 55 """ 56 57 values = array_ops.reshape(values, [-1]) 58 values = sort_ops.sort(values) 59 elements = math_ops.cast(array_ops.size(values), dtypes.float32) 60 num_buckets = 1. / epsilon 61 increment = math_ops.cast(elements / num_buckets, dtypes.int32) 62 start = increment 63 step = math_ops.maximum(increment, 1) 64 boundaries = values[start::step] 65 weights = array_ops.ones_like(boundaries) 66 weights = weights * math_ops.cast(step, dtypes.float32) 67 return array_ops.stack([boundaries, weights]) 68 69 70def compress(summary, epsilon): 71 """Compress a summary to within `epsilon` accuracy. 72 73 The compression step is needed to keep the summary sizes small after merging, 74 and also used to return the final target boundaries. It finds the new bins 75 based on interpolating cumulative weight percentages from the large summary. 76 Taking the difference of the cumulative weights from the previous bin's 77 cumulative weight will give the new weight for that bin. 78 79 Args: 80 summary: 2-D `np.ndarray` summary to be compressed. 81 epsilon: A `'float32'` that determines the approxmiate desired precision. 82 83 Returns: 84 A 2-D `np.ndarray` that is a compressed summary. First column is the 85 interpolated partition values, the second is the weights (counts). 86 """ 87 # TODO(b/184863356): remove the numpy escape hatch here. 88 return script_ops.numpy_function( 89 lambda s: _compress_summary_numpy(s, epsilon), [summary], dtypes.float32) 90 91 92def _compress_summary_numpy(summary, epsilon): 93 """Compress a summary with numpy.""" 94 if summary.shape[1] * epsilon < 1: 95 return summary 96 97 percents = epsilon + np.arange(0.0, 1.0, epsilon) 98 cum_weights = summary[1].cumsum() 99 cum_weight_percents = cum_weights / cum_weights[-1] 100 new_bins = np.interp(percents, cum_weight_percents, summary[0]) 101 cum_weights = np.interp(percents, cum_weight_percents, cum_weights) 102 new_weights = cum_weights - np.concatenate((np.array([0]), cum_weights[:-1])) 103 summary = np.stack((new_bins, new_weights)) 104 return summary.astype(np.float32) 105 106 107def merge_summaries(prev_summary, next_summary, epsilon): 108 """Weighted merge sort of summaries. 109 110 Given two summaries of distinct data, this function merges (and compresses) 111 them to stay within `epsilon` error tolerance. 112 113 Args: 114 prev_summary: 2-D `np.ndarray` summary to be merged with `next_summary`. 115 next_summary: 2-D `np.ndarray` summary to be merged with `prev_summary`. 116 epsilon: A float that determines the approxmiate desired precision. 117 118 Returns: 119 A 2-D `np.ndarray` that is a merged summary. First column is the 120 interpolated partition values, the second is the weights (counts). 121 """ 122 merged = array_ops.concat((prev_summary, next_summary), axis=1) 123 merged = array_ops.gather_v2(merged, sort_ops.argsort(merged[0]), axis=1) 124 return compress(merged, epsilon) 125 126 127def get_bin_boundaries(summary, num_bins): 128 return compress(summary, 1.0 / num_bins)[0, :-1] 129 130 131@keras_export("keras.layers.experimental.preprocessing.Discretization") 132class Discretization(base_preprocessing_layer.PreprocessingLayer): 133 """Buckets data into discrete ranges. 134 135 This layer will place each element of its input data into one of several 136 contiguous ranges and output an integer index indicating which range each 137 element was placed in. 138 139 Input shape: 140 Any `tf.Tensor` or `tf.RaggedTensor` of dimension 2 or higher. 141 142 Output shape: 143 Same as input shape. 144 145 Attributes: 146 bin_boundaries: A list of bin boundaries. The leftmost and rightmost bins 147 will always extend to `-inf` and `inf`, so `bin_boundaries=[0., 1., 2.]` 148 generates bins `(-inf, 0.)`, `[0., 1.)`, `[1., 2.)`, and `[2., +inf)`. If 149 this option is set, `adapt` should not be called. 150 num_bins: The integer number of bins to compute. If this option is set, 151 `adapt` should be called to learn the bin boundaries. 152 epsilon: Error tolerance, typically a small fraction close to zero (e.g. 153 0.01). Higher values of epsilon increase the quantile approximation, and 154 hence result in more unequal buckets, but could improve performance 155 and resource consumption. 156 157 Examples: 158 159 Bucketize float values based on provided buckets. 160 >>> input = np.array([[-1.5, 1.0, 3.4, .5], [0.0, 3.0, 1.3, 0.0]]) 161 >>> layer = tf.keras.layers.experimental.preprocessing.Discretization( 162 ... bin_boundaries=[0., 1., 2.]) 163 >>> layer(input) 164 <tf.Tensor: shape=(2, 4), dtype=int64, numpy= 165 array([[0, 2, 3, 1], 166 [1, 3, 2, 1]])> 167 168 Bucketize float values based on a number of buckets to compute. 169 >>> input = np.array([[-1.5, 1.0, 3.4, .5], [0.0, 3.0, 1.3, 0.0]]) 170 >>> layer = tf.keras.layers.experimental.preprocessing.Discretization( 171 ... num_bins=4, epsilon=0.01) 172 >>> layer.adapt(input) 173 >>> layer(input) 174 <tf.Tensor: shape=(2, 4), dtype=int64, numpy= 175 array([[0, 2, 3, 2], 176 [1, 3, 3, 1]])> 177 """ 178 179 def __init__(self, 180 bin_boundaries=None, 181 num_bins=None, 182 epsilon=0.01, 183 **kwargs): 184 # bins is a deprecated arg for setting bin_boundaries or num_bins that still 185 # has some usage. 186 if "bins" in kwargs: 187 logging.warning( 188 "bins is deprecated, please use bin_boundaries or num_bins instead.") 189 if isinstance(kwargs["bins"], int) and num_bins is None: 190 num_bins = kwargs["bins"] 191 elif bin_boundaries is None: 192 bin_boundaries = kwargs["bins"] 193 del kwargs["bins"] 194 super().__init__(streaming=True, **kwargs) 195 if num_bins is not None and num_bins < 0: 196 raise ValueError("`num_bins` must be must be greater than or equal to 0. " 197 "You passed `num_bins={}`".format(num_bins)) 198 if num_bins is not None and bin_boundaries is not None: 199 raise ValueError("Both `num_bins` and `bin_boundaries` should not be " 200 "set. You passed `num_bins={}` and " 201 "`bin_boundaries={}`".format(num_bins, bin_boundaries)) 202 bin_boundaries = self._convert_to_list(bin_boundaries) 203 self.input_bin_boundaries = bin_boundaries 204 self.bin_boundaries = bin_boundaries if bin_boundaries is not None else [] 205 self.num_bins = num_bins 206 self.epsilon = epsilon 207 208 def build(self, input_shape): 209 super().build(input_shape) 210 211 if self.input_bin_boundaries is not None: 212 return 213 214 # Summary contains two equal length vectors of bins at index 0 and weights 215 # at index 1. 216 self.summary = self.add_weight( 217 name="summary", 218 shape=(2, None), 219 dtype=dtypes.float32, 220 initializer=lambda shape, dtype: [[], []], # pylint: disable=unused-arguments 221 trainable=False) 222 223 def update_state(self, data): 224 if self.input_bin_boundaries is not None: 225 raise ValueError( 226 "Cannot adapt a Discretization layer that has been initialized with " 227 "`bin_boundaries`, use `num_bins` instead. You passed " 228 "`bin_boundaries={}`.".format(self.input_bin_boundaries)) 229 230 if not self.built: 231 raise RuntimeError("`build` must be called before `update_state`.") 232 233 data = ops.convert_to_tensor_v2_with_dispatch(data) 234 if data.dtype != dtypes.float32: 235 data = math_ops.cast(data, dtypes.float32) 236 summary = summarize(data, self.epsilon) 237 self.summary.assign(merge_summaries(summary, self.summary, self.epsilon)) 238 239 def merge_state(self, layers): 240 for l in layers + [self]: 241 if l.input_bin_boundaries is not None: 242 raise ValueError( 243 "Cannot merge Discretization layer {} that has been initialized " 244 "with `bin_boundaries`, use `num_bins` instead. You passed " 245 "`bin_boundaries={}`.".format(l.name, l.input_bin_boundaries)) 246 if not l.built: 247 raise ValueError( 248 "Cannot merge Discretization layer {}, it has no state. You need " 249 "to call `adapt` on this layer before merging.".format(l.name)) 250 251 summary = self.summary 252 for l in layers: 253 summary = merge_summaries(summary, l.summary, self.epsilon) 254 self.summary.assign(summary) 255 self.finalize_state() 256 257 def finalize_state(self): 258 if self.input_bin_boundaries is not None or not self.built: 259 return 260 261 # The bucketize op only support list boundaries. 262 self.bin_boundaries = self._convert_to_list( 263 get_bin_boundaries(self.summary, self.num_bins)) 264 265 def reset_state(self): # pylint: disable=method-hidden 266 if self.input_bin_boundaries is not None or not self.built: 267 return 268 269 self.summary.assign([[], []]) 270 271 def get_config(self): 272 config = super().get_config() 273 config.update({ 274 "bin_boundaries": self.input_bin_boundaries, 275 "num_bins": self.num_bins, 276 "epsilon": self.epsilon, 277 }) 278 return config 279 280 def compute_output_shape(self, input_shape): 281 return input_shape 282 283 def compute_output_signature(self, input_spec): 284 output_shape = self.compute_output_shape(input_spec.shape.as_list()) 285 output_dtype = dtypes.int64 286 if isinstance(input_spec, sparse_tensor.SparseTensorSpec): 287 return sparse_tensor.SparseTensorSpec( 288 shape=output_shape, dtype=output_dtype) 289 return tensor_spec.TensorSpec(shape=output_shape, dtype=output_dtype) 290 291 def call(self, inputs): 292 def bucketize(inputs): 293 return gen_math_ops.Bucketize( 294 input=inputs, boundaries=self.bin_boundaries) 295 296 if tf_utils.is_ragged(inputs): 297 integer_buckets = ragged_functional_ops.map_flat_values(bucketize, inputs) 298 # Ragged map_flat_values doesn't touch the non-values tensors in the 299 # ragged composite tensor. If this op is the only op a Keras model, 300 # this can cause errors in Graph mode, so wrap the tensor in an identity. 301 return array_ops.identity(integer_buckets) 302 elif tf_utils.is_sparse(inputs): 303 return sparse_tensor.SparseTensor( 304 indices=array_ops.identity(inputs.indices), 305 values=bucketize(inputs.values), 306 dense_shape=array_ops.identity(inputs.dense_shape)) 307 else: 308 return bucketize(inputs) 309 310 def _convert_to_list(self, inputs): 311 if tensor_util.is_tensor(inputs): 312 inputs = inputs.numpy() 313 if isinstance(inputs, (np.ndarray)): 314 inputs = inputs.tolist() 315 inputs = list(inputs) 316 return inputs 317