1# Copyright 2021 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Python TF-Lite QuantizationDebugger.""" 16import collections 17import csv 18import re 19 20from typing import (Any, Callable, Dict, IO, Iterable, List, Mapping, Optional, 21 Sequence, Tuple) 22 23import numpy as np 24import tensorflow as tf 25 26from tensorflow.lite.python import convert 27from tensorflow.python.util import tf_export 28 29# pylint: disable=g-import-not-at-top 30try: 31 from tensorflow.lite.python import metrics_portable as metrics_stub # type: ignore 32except ImportError: 33 from tensorflow.lite.python import metrics_nonportable as metrics_stub # type: ignore 34# pylint: enable=g-import-not-at-top 35 36# Returns metrics based on difference of values for quantized/float ops. 37_DEFAULT_LAYER_DEBUG_METRICS = { 38 'num_elements': lambda diffs: diffs.size, 39 'stddev': np.std, 40 'mean_error': np.average, 41 'max_abs_error': lambda diffs: np.max(np.abs(diffs)), 42 'mean_squared_error': lambda diffs: np.average(diffs**2), 43} 44 45_NUMERIC_VERIFY_OP_NAME = 'NumericVerify' 46 47 48def _get_quant_params( 49 tensor_detail: Mapping[str, Any]) -> Optional[Tuple[float, int]]: 50 """Returns first scale and zero point from tensor detail, if present.""" 51 quant_params = tensor_detail['quantization_parameters'] 52 if not quant_params: 53 return None 54 if quant_params['scales'] and quant_params['zero_points']: 55 return (quant_params['scales'][0], quant_params['zero_points'][0]) 56 return None 57 58 59@tf_export.tf_export('lite.experimental.QuantizationDebugOptions') 60class QuantizationDebugOptions: 61 """Debug options to set up a given QuantizationDebugger.""" 62 63 def __init__(self, 64 layer_debug_metrics: Optional[Mapping[str, 65 Callable[[np.ndarray], 66 float]]] = None, 67 model_debug_metrics: Optional[Mapping[ 68 str, Callable[[Sequence[np.ndarray], Sequence[np.ndarray]], 69 float]]] = None, 70 layer_direct_compare_metrics: Optional[Mapping[str, Callable[ 71 [Sequence[np.ndarray], Sequence[np.ndarray], float, int], 72 float]]] = None, 73 denylisted_ops: Optional[List[str]] = None, 74 denylisted_nodes: Optional[List[str]] = None, 75 fully_quantize: bool = False) -> None: 76 """Initializes debugger options. 77 78 Args: 79 layer_debug_metrics: a dict to specify layer debug functions 80 {function_name_str: function} where the function accepts result of 81 NumericVerify Op, which is value difference between float and 82 dequantized op results. The function returns single scalar value. 83 model_debug_metrics: a dict to specify model debug functions 84 {function_name_str: function} where the function accepts outputs from 85 two models, and returns single scalar value for a metric. (e.g. 86 accuracy, IoU) 87 layer_direct_compare_metrics: a dict to specify layer debug functions 88 {function_name_str: function}. The signature is different from that of 89 `layer_debug_metrics`, and this one gets passed (original float value, 90 original quantized value, scale, zero point). The function's 91 implementation is responsible for correctly dequantize the quantized 92 value to compare. Use this one when comparing diff is not enough. 93 (Note) quantized value is passed as int8, so cast to int32 is needed. 94 denylisted_ops: a list of op names which is expected to be removed from 95 quantization. 96 denylisted_nodes: a list of op's output tensor names to be removed from 97 quantization. 98 fully_quantize: Bool indicating whether to fully quantize the model. 99 Besides model body, the input/output will be quantized as well. 100 Corresponding to mlir_quantize's fully_quantize parameter. 101 102 Raises: 103 ValueError: when there are duplicate keys 104 """ 105 self.layer_debug_metrics = layer_debug_metrics 106 self.model_debug_metrics = model_debug_metrics 107 self.layer_direct_compare_metrics = layer_direct_compare_metrics 108 109 keys = [] 110 for metrics in [ 111 layer_debug_metrics, model_debug_metrics, layer_direct_compare_metrics]: 112 if metrics is not None: 113 keys.extend(metrics.keys()) 114 if len(keys) != len(set(keys)): 115 raise ValueError('Provided metrics have duplicate keys.') 116 117 self.denylisted_ops = denylisted_ops 118 self.denylisted_nodes = denylisted_nodes 119 self.fully_quantize = fully_quantize 120 121 122@tf_export.tf_export('lite.experimental.QuantizationDebugger') 123class QuantizationDebugger: 124 """Debugger for Quantized TensorFlow Lite debug mode models. 125 126 This can run the TensorFlow Lite converted models equipped with debug ops and 127 collect debug information. This debugger calculates statistics from 128 user-defined post-processing functions as well as default ones. 129 """ 130 131 def __init__( 132 self, 133 quant_debug_model_path: Optional[str] = None, 134 quant_debug_model_content: Optional[bytes] = None, 135 float_model_path: Optional[str] = None, 136 float_model_content: Optional[bytes] = None, 137 debug_dataset: Optional[Callable[[], 138 Iterable[Sequence[np.ndarray]]]] = None, 139 debug_options: Optional[QuantizationDebugOptions] = None, 140 converter: Optional[tf.lite.TFLiteConverter] = None) -> None: 141 """Runs the TFLite debugging model with given debug options. 142 143 Args: 144 quant_debug_model_path: Path to the quantized debug TFLite model file. 145 quant_debug_model_content: Content of the quantized debug TFLite model. 146 float_model_path: Path to float TFLite model file. 147 float_model_content: Content of the float TFLite model. 148 debug_dataset: a factory function that returns dataset generator which is 149 used to generate input samples (list of np.ndarray) for the model. The 150 generated elements must have same types and shape as inputs to the 151 model. 152 debug_options: Debug options to debug the given model. 153 converter: Optional, use converter instead of quantized model. 154 155 Raises: 156 ValueError: If the debugger was unable to be created. 157 158 Attributes: 159 layer_statistics: results of error metrics for each NumericVerify op 160 results. in {layer_name: {metric_name: metric}} format. 161 model_statistics: results of error metrics for difference between float 162 and quantized models. in {metric_name: metric} format. 163 """ 164 self._data_gen = debug_dataset 165 self._debug_options = debug_options or QuantizationDebugOptions() 166 self.converter = None 167 self.calibrated_model = None 168 self.float_model = None 169 self._float_interpreter = None 170 if converter is not None: 171 if self._debug_options.model_debug_metrics: 172 self.converter = self._set_converter_options_for_float(converter) 173 self.float_model = self.converter.convert() 174 175 self.converter = self._set_converter_options_for_calibration( 176 converter) 177 self.calibrated_model = self.converter.convert() 178 # Converter should be already set up with all options 179 self._init_from_converter(self._debug_options, self.converter, 180 self.calibrated_model, 181 float_model=self.float_model) 182 else: 183 self._quant_interpreter = tf.lite.Interpreter( 184 quant_debug_model_path, 185 quant_debug_model_content, 186 experimental_preserve_all_tensors=( 187 self._debug_options.layer_direct_compare_metrics is not None)) 188 if self._debug_options.model_debug_metrics: 189 self._float_interpreter = tf.lite.Interpreter(float_model_path, 190 float_model_content) 191 self._initialize_stats() 192 193 @property 194 def options(self) -> QuantizationDebugOptions: 195 return self._debug_options 196 197 @options.setter 198 def options(self, options: QuantizationDebugOptions) -> None: 199 self._debug_options = options 200 if not self.converter or not self.calibrated_model: 201 return 202 self._init_from_converter(self._debug_options, self.converter, 203 self.calibrated_model, 204 float_model=self.float_model) 205 self._initialize_stats() 206 207 def _initialize_stats(self): 208 """Helper function initializes stats.""" 209 # TODO(b/177749613) : Fix the dependency on tf.lite._get_ops_details() 210 # Following code is needed to get op's name from the output tensor index, 211 # since NumericVerify op only provides its quantized input tensor index. 212 self._defining_op = dict() 213 for op_info in self._quant_interpreter._get_ops_details(): # pylint: disable=protected-access 214 self._defining_op.update( 215 {tensor_idx: op_info['index'] for tensor_idx in op_info['outputs']}) 216 217 self._numeric_verify_tensor_details = None 218 self._numeric_verify_op_details = None 219 if not self._get_numeric_verify_tensor_details(): 220 raise ValueError('Please check if the quantized model is in debug mode') 221 222 self._layer_debug_metrics = _DEFAULT_LAYER_DEBUG_METRICS.copy() 223 if self._debug_options.layer_debug_metrics: 224 self._layer_debug_metrics.update(self._debug_options.layer_debug_metrics) 225 226 self.layer_statistics = None 227 self.model_statistics = None 228 229 self._metrics = metrics_stub.TFLiteMetrics() 230 self._metrics.increase_counter_debugger_creation() 231 232 def _get_quantized_model(self, is_debug: bool) -> bytes: 233 if not self.converter: 234 raise ValueError('No converter found, use this function with the ' 235 'converter option in the constructor.') 236 237 return convert.mlir_quantize( 238 self.calibrated_model, 239 disable_per_channel=self.converter._experimental_disable_per_channel, # pylint: disable=protected-access 240 fully_quantize=self._debug_options.fully_quantize, 241 enable_numeric_verify=is_debug, 242 denylisted_ops=self._debug_options.denylisted_ops, 243 denylisted_nodes=self._debug_options.denylisted_nodes) 244 245 def get_nondebug_quantized_model(self) -> bytes: 246 """Returns a non-instrumented quantized model. 247 248 Convert the quantized model with the initialized converter and 249 return bytes for nondebug model. The model will not be instrumented with 250 numeric verification operations. 251 252 Returns: 253 Model bytes corresponding to the model. 254 Raises: 255 ValueError: if converter is not passed to the debugger. 256 """ 257 return self._get_quantized_model(is_debug=False) 258 259 def get_debug_quantized_model(self) -> bytes: 260 """Returns an instrumented quantized model. 261 262 Convert the quantized model with the initialized converter and 263 return bytes for model. The model will be instrumented with numeric 264 verification operations and should only be used for debugging. 265 266 Returns: 267 Model bytes corresponding to the model. 268 Raises: 269 ValueError: if converter is not passed to the debugger. 270 """ 271 return self._get_quantized_model(is_debug=True) 272 273 def _init_from_converter(self, options: QuantizationDebugOptions, 274 converter: tf.lite.TFLiteConverter, 275 calibrated_model: Optional[bytes] = None, 276 float_model: Optional[bytes] = None) -> None: 277 """Convert the model and apply options. 278 279 Converts the quantized model and initializes a quantized model interpreter 280 with the quantized model. Returns a float model interpreter if float model 281 is provided. 282 283 Args: 284 options: a QuantizationDebugOptions object. 285 converter: an initialized tf.lite.TFLiteConverter. 286 calibrated_model: Calibrated model bytes. 287 float_model: Float model bytes. 288 """ 289 self.quant_model = convert.mlir_quantize( 290 calibrated_model, 291 disable_per_channel=converter._experimental_disable_per_channel, # pylint: disable=protected-access 292 fully_quantize=options.fully_quantize, 293 enable_numeric_verify=True, 294 denylisted_ops=options.denylisted_ops, 295 denylisted_nodes=options.denylisted_nodes) 296 self._quant_interpreter = tf.lite.Interpreter( 297 model_content=self.quant_model) 298 self._float_interpreter = None 299 if float_model is not None: 300 self._float_interpreter = tf.lite.Interpreter(model_content=float_model) 301 302 def _set_converter_options_for_float( 303 self, converter: tf.lite.TFLiteConverter) -> tf.lite.TFLiteConverter: 304 """Verify converter options and set required experimental options.""" 305 if converter.optimizations: 306 converter.optimizations = [] 307 return converter 308 309 def _set_converter_options_for_calibration( 310 self, converter: tf.lite.TFLiteConverter) -> tf.lite.TFLiteConverter: 311 """Verify converter options and set required experimental options.""" 312 if not converter.optimizations: 313 converter.optimizations = [tf.lite.Optimize.DEFAULT] 314 if not converter.representative_dataset: 315 raise ValueError('converter object must set representative_dataset') 316 317 converter.experimental_mlir_quantizer = True 318 converter._experimental_calibrate_only = True # pylint: disable=protected-access 319 return converter 320 321 def run(self) -> None: 322 """Runs models and gets metrics.""" 323 self.layer_statistics = self._collect_layer_statistics() 324 if self._debug_options.model_debug_metrics: 325 self.model_statistics = self._collect_model_statistics() 326 327 def _collect_layer_statistics(self) -> Dict[str, Dict[str, float]]: 328 """Collects layer statistics by applying layer debug metrics. 329 330 For all data from the given RepresentativeDataset, collect statistics per 331 example by getting the NumericVerify op results in _quant_interpreter 332 and calculating layer debug metrics on the results. 333 334 Returns: 335 aggregated per-layer statistics of NumericVerify results. 336 {layer_name: {metric_name: metric}} 337 """ 338 layer_statistics = collections.defaultdict( 339 lambda: collections.defaultdict(list)) 340 341 initialize = True 342 for tensor_data in self._data_gen(): 343 self._set_input_tensors(self._quant_interpreter, tensor_data, initialize) 344 initialize = False 345 346 # Run the model. 347 self._quant_interpreter.invoke() 348 349 # Collect the statistics of this invoke result. 350 for tensor_detail in self._get_numeric_verify_tensor_details(): 351 tensor_name = tensor_detail['name'] 352 diffs = self._quant_interpreter.get_tensor(tensor_detail['index']) 353 for metric_name, metric_fn in self._layer_debug_metrics.items(): 354 layer_statistics[tensor_name][metric_name].append(metric_fn(diffs)) 355 356 if self._debug_options.layer_direct_compare_metrics is not None: 357 for tensor_detail in self._get_numeric_verify_tensor_details(): 358 tensor_name = tensor_detail['name'] 359 op_idx = self._defining_op[tensor_detail['index']] 360 op_detail = self._quant_interpreter._get_op_details(op_idx) # pylint: disable=protected-access 361 q_idx, f_idx = op_detail['inputs'] 362 quant_input_detail = self._quant_interpreter._get_tensor_details( # pylint: disable=protected-access 363 q_idx) 364 for (metric_name, metric_fn 365 ) in self._debug_options.layer_direct_compare_metrics.items(): 366 layer_statistics[tensor_name][metric_name].append( 367 metric_fn( 368 self._quant_interpreter.get_tensor(f_idx), 369 self._quant_interpreter.get_tensor(q_idx), 370 quant_input_detail['quantization_parameters']['scales'][0], 371 quant_input_detail['quantization_parameters']['zero_points'] 372 [0])) 373 374 # Calculate final aggregated metrics for each layer. 375 for metrics in layer_statistics.values(): 376 for metric_name in metrics: 377 metrics[metric_name] = np.nanmean(metrics[metric_name]) 378 379 return layer_statistics 380 381 def _collect_model_statistics(self) -> Dict[str, float]: 382 """Collects model output metrics. 383 384 For all data from the given RepresentativeDataset, collect all model output 385 results from float model & quantized debug model, and calculate metrics 386 by using model output functions. As a result, self.model_results is filled, 387 388 where self.model_results[model_output_function_name] = `aggregated model 389 output function value` (a scalar). 390 391 Returns: 392 aggregated per-model output discrepancy metrics. 393 {metric_name: aggregated_metric} 394 """ 395 396 model_statistics = collections.defaultdict(list) 397 398 initialize = True 399 for tensor_data in self._data_gen(): 400 self._set_input_tensors(self._quant_interpreter, tensor_data, initialize) 401 self._set_input_tensors(self._float_interpreter, tensor_data, initialize) 402 initialize = False 403 404 # Run the models. 405 self._quant_interpreter.invoke() 406 self._float_interpreter.invoke() 407 408 # Collect the output results from both models. 409 float_tensor_data = self._get_output_tensors(self._float_interpreter) 410 quant_tensor_data = self._get_output_tensors(self._quant_interpreter) 411 412 # Calculate the metrics. 413 for (metric_name, 414 metric_fn) in self._debug_options.model_debug_metrics.items(): 415 model_statistics[metric_name].append( 416 metric_fn(float_tensor_data, quant_tensor_data)) 417 418 # Calculate final aggregated metrics for each outputs. 419 return { 420 metric_name: np.mean(metric) 421 for metric_name, metric in model_statistics.items() 422 } 423 424 def _set_input_tensors(self, interpreter: tf.lite.Interpreter, 425 tensor_data: Sequence[np.ndarray], 426 initialize: bool) -> None: 427 """Sets input tensors into TFLite model Interpreter. 428 429 Args: 430 interpreter: a tf.lite.Interpreter object with allocated tensors. 431 tensor_data: a list of Numpy array data. 432 initialize: set to true when input is first set for the interpreter, to 433 set input shapes and allocate tensors. 434 435 Raises: 436 ValueError: when inputs can't be set, or size of provided inputs does not 437 match size of model inputs. 438 """ 439 input_details = interpreter.get_input_details() 440 if len(input_details) != len(tensor_data): 441 raise ValueError( 442 'Number of inputs provided ({}) does not match number of inputs to ' 443 'the model ({})'.format(len(tensor_data), len(input_details))) 444 445 if initialize: 446 for input_detail, tensor in zip(input_details, tensor_data): 447 interpreter.resize_tensor_input(input_detail['index'], tensor.shape) 448 interpreter.allocate_tensors() 449 450 for input_detail, tensor in zip(input_details, tensor_data): 451 if tensor.dtype == np.float32 and input_detail['dtype'] == np.int8: 452 quant_params = _get_quant_params(input_detail) 453 if quant_params: 454 scale, zero_point = quant_params 455 tensor = np.round((tensor / scale) + zero_point).astype(np.int8) 456 interpreter.set_tensor(input_detail['index'], tensor) 457 458 def _get_output_tensors(self, 459 interpreter: tf.lite.Interpreter) -> List[np.ndarray]: 460 """Returns output tensors of given TFLite model Interpreter. 461 462 Args: 463 interpreter: a tf.lite.Interpreter object with allocated tensors. 464 465 Returns: 466 a list of numpy arrays representing output tensor results. 467 """ 468 469 outputs = [] 470 for output_detail in interpreter.get_output_details(): 471 tensor = interpreter.get_tensor(output_detail['index']) 472 if output_detail['dtype'] == np.int8: 473 quant_params = _get_quant_params(output_detail) 474 if quant_params: 475 scale, zero_point = quant_params 476 tensor = ((tensor.astype(np.float32) - zero_point) * scale).astype( 477 np.float32) 478 outputs.append(tensor) 479 480 return outputs 481 482 def _get_numeric_verify_tensor_details(self) -> List[str]: 483 """Returns all names of all tensors from NumericVerify op.""" 484 # pylint: disable=protected-access 485 if not self._numeric_verify_tensor_details: 486 self._numeric_verify_tensor_details = [] 487 self._numeric_verify_op_details = {} 488 for op_info in self._quant_interpreter._get_ops_details(): 489 if op_info['op_name'] == _NUMERIC_VERIFY_OP_NAME: 490 self._numeric_verify_tensor_details.append( 491 self._quant_interpreter._get_tensor_details( 492 op_info['outputs'][0])) 493 tensor_name = self._numeric_verify_tensor_details[-1]['name'] 494 self._numeric_verify_op_details[tensor_name] = op_info 495 # pylint: enable=protected-access 496 return self._numeric_verify_tensor_details 497 498 def _get_operand_name_and_index( 499 self, numeric_verify_name: str) -> Tuple[str, int]: 500 """Gets the index and name of NumericVerify Op's quantized input tensor. 501 502 Args: 503 numeric_verify_name: name of the NumericVerify op's output tensor. It has 504 format of `NumericVerify/{quantized_tensor_name}:{quantized_tensor_idx}` 505 506 Returns: 507 Tuple of (tensor_name, tensor_idx) for quantized op's output tensor. 508 """ 509 tensor_name, tensor_idx = numeric_verify_name.rsplit(':', 1) 510 float_tensor_name = tensor_name[len(_NUMERIC_VERIFY_OP_NAME) + 1:] 511 if re.match(r'\d', float_tensor_name[-1]): 512 float_tensor_name = float_tensor_name[:-1] 513 514 return (float_tensor_name, int(tensor_idx)) 515 516 def layer_statistics_dump(self, file: IO[str]) -> None: 517 """Dumps layer statistics into file, in csv format. 518 519 Args: 520 file: file, or file-like object to write. 521 """ 522 # order of `fields` is the order of fields in csv. 523 fields = ['op_name', 'tensor_idx'] + list(self._layer_debug_metrics.keys()) 524 if self._debug_options.layer_direct_compare_metrics is not None: 525 fields += list(self._debug_options.layer_direct_compare_metrics.keys()) 526 fields += ['scale', 'zero_point', 'tensor_name'] 527 writer = csv.DictWriter(file, fields) 528 writer.writeheader() 529 for name, metrics in self.layer_statistics.items(): 530 data = metrics.copy() 531 (data['tensor_name'], _) = self._get_operand_name_and_index(name) 532 data['tensor_idx'] = self._numeric_verify_op_details[name]['inputs'][0] 533 data['op_name'] = self._quant_interpreter._get_op_details( # pylint: disable=protected-access 534 self._defining_op[data['tensor_idx']])['op_name'] 535 details = self._quant_interpreter._get_tensor_details(data['tensor_idx']) # pylint: disable=protected-access 536 data['scale'], data['zero_point'] = ( 537 details['quantization_parameters']['scales'][0], 538 details['quantization_parameters']['zero_points'][0]) 539 writer.writerow(data) 540