• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Python TF-Lite QuantizationDebugger."""
16import collections
17import csv
18import re
19
20from typing import (Any, Callable, Dict, IO, Iterable, List, Mapping, Optional,
21                    Sequence, Tuple)
22
23import numpy as np
24import tensorflow as tf
25
26from tensorflow.lite.python import convert
27from tensorflow.python.util import tf_export
28
29# pylint: disable=g-import-not-at-top
30try:
31  from tensorflow.lite.python import metrics_portable as metrics_stub  # type: ignore
32except ImportError:
33  from tensorflow.lite.python import metrics_nonportable as metrics_stub  # type: ignore
34# pylint: enable=g-import-not-at-top
35
36# Returns metrics based on difference of values for quantized/float ops.
37_DEFAULT_LAYER_DEBUG_METRICS = {
38    'num_elements': lambda diffs: diffs.size,
39    'stddev': np.std,
40    'mean_error': np.average,
41    'max_abs_error': lambda diffs: np.max(np.abs(diffs)),
42    'mean_squared_error': lambda diffs: np.average(diffs**2),
43}
44
45_NUMERIC_VERIFY_OP_NAME = 'NumericVerify'
46
47
48def _get_quant_params(
49    tensor_detail: Mapping[str, Any]) -> Optional[Tuple[float, int]]:
50  """Returns first scale and zero point from tensor detail, if present."""
51  quant_params = tensor_detail['quantization_parameters']
52  if not quant_params:
53    return None
54  if quant_params['scales'] and quant_params['zero_points']:
55    return (quant_params['scales'][0], quant_params['zero_points'][0])
56  return None
57
58
59@tf_export.tf_export('lite.experimental.QuantizationDebugOptions')
60class QuantizationDebugOptions:
61  """Debug options to set up a given QuantizationDebugger."""
62
63  def __init__(self,
64               layer_debug_metrics: Optional[Mapping[str,
65                                                     Callable[[np.ndarray],
66                                                              float]]] = None,
67               model_debug_metrics: Optional[Mapping[
68                   str, Callable[[Sequence[np.ndarray], Sequence[np.ndarray]],
69                                 float]]] = None,
70               layer_direct_compare_metrics: Optional[Mapping[str, Callable[
71                   [Sequence[np.ndarray], Sequence[np.ndarray], float, int],
72                   float]]] = None,
73               denylisted_ops: Optional[List[str]] = None,
74               denylisted_nodes: Optional[List[str]] = None,
75               fully_quantize: bool = False) -> None:
76    """Initializes debugger options.
77
78    Args:
79      layer_debug_metrics: a dict to specify layer debug functions
80        {function_name_str: function} where the function accepts result of
81          NumericVerify Op, which is value difference between float and
82          dequantized op results. The function returns single scalar value.
83      model_debug_metrics: a dict to specify model debug functions
84        {function_name_str: function} where the function accepts outputs from
85          two models, and returns single scalar value for a metric. (e.g.
86          accuracy, IoU)
87      layer_direct_compare_metrics: a dict to specify layer debug functions
88        {function_name_str: function}. The signature is different from that of
89          `layer_debug_metrics`, and this one gets passed (original float value,
90          original quantized value, scale, zero point). The function's
91          implementation is responsible for correctly dequantize the quantized
92          value to compare. Use this one when comparing diff is not enough.
93          (Note) quantized value is passed as int8, so cast to int32 is needed.
94      denylisted_ops: a list of op names which is expected to be removed from
95        quantization.
96      denylisted_nodes: a list of op's output tensor names to be removed from
97        quantization.
98      fully_quantize: Bool indicating whether to fully quantize the model.
99        Besides model body, the input/output will be quantized as well.
100        Corresponding to mlir_quantize's fully_quantize parameter.
101
102    Raises:
103      ValueError: when there are duplicate keys
104    """
105    self.layer_debug_metrics = layer_debug_metrics
106    self.model_debug_metrics = model_debug_metrics
107    self.layer_direct_compare_metrics = layer_direct_compare_metrics
108
109    keys = []
110    for metrics in [
111        layer_debug_metrics, model_debug_metrics, layer_direct_compare_metrics]:
112      if metrics is not None:
113        keys.extend(metrics.keys())
114    if len(keys) != len(set(keys)):
115      raise ValueError('Provided metrics have duplicate keys.')
116
117    self.denylisted_ops = denylisted_ops
118    self.denylisted_nodes = denylisted_nodes
119    self.fully_quantize = fully_quantize
120
121
122@tf_export.tf_export('lite.experimental.QuantizationDebugger')
123class QuantizationDebugger:
124  """Debugger for Quantized TensorFlow Lite debug mode models.
125
126  This can run the TensorFlow Lite converted models equipped with debug ops and
127  collect debug information. This debugger calculates statistics from
128  user-defined post-processing functions as well as default ones.
129  """
130
131  def __init__(
132      self,
133      quant_debug_model_path: Optional[str] = None,
134      quant_debug_model_content: Optional[bytes] = None,
135      float_model_path: Optional[str] = None,
136      float_model_content: Optional[bytes] = None,
137      debug_dataset: Optional[Callable[[],
138                                       Iterable[Sequence[np.ndarray]]]] = None,
139      debug_options: Optional[QuantizationDebugOptions] = None,
140      converter: Optional[tf.lite.TFLiteConverter] = None) -> None:
141    """Runs the TFLite debugging model with given debug options.
142
143    Args:
144      quant_debug_model_path: Path to the quantized debug TFLite model file.
145      quant_debug_model_content: Content of the quantized debug TFLite model.
146      float_model_path: Path to float TFLite model file.
147      float_model_content: Content of the float TFLite model.
148      debug_dataset: a factory function that returns dataset generator which is
149        used to generate input samples (list of np.ndarray) for the model. The
150        generated elements must have same types and shape as inputs to the
151        model.
152      debug_options: Debug options to debug the given model.
153      converter: Optional, use converter instead of quantized model.
154
155    Raises:
156      ValueError: If the debugger was unable to be created.
157
158    Attributes:
159      layer_statistics: results of error metrics for each NumericVerify op
160        results. in {layer_name: {metric_name: metric}} format.
161      model_statistics: results of error metrics for difference between float
162        and quantized models. in {metric_name: metric} format.
163    """
164    self._data_gen = debug_dataset
165    self._debug_options = debug_options or QuantizationDebugOptions()
166    self.converter = None
167    self.calibrated_model = None
168    self.float_model = None
169    self._float_interpreter = None
170    if converter is not None:
171      if self._debug_options.model_debug_metrics:
172        self.converter = self._set_converter_options_for_float(converter)
173        self.float_model = self.converter.convert()
174
175      self.converter = self._set_converter_options_for_calibration(
176          converter)
177      self.calibrated_model = self.converter.convert()
178      # Converter should be already set up with all options
179      self._init_from_converter(self._debug_options, self.converter,
180                                self.calibrated_model,
181                                float_model=self.float_model)
182    else:
183      self._quant_interpreter = tf.lite.Interpreter(
184          quant_debug_model_path,
185          quant_debug_model_content,
186          experimental_preserve_all_tensors=(
187              self._debug_options.layer_direct_compare_metrics is not None))
188      if self._debug_options.model_debug_metrics:
189        self._float_interpreter = tf.lite.Interpreter(float_model_path,
190                                                      float_model_content)
191    self._initialize_stats()
192
193  @property
194  def options(self) -> QuantizationDebugOptions:
195    return self._debug_options
196
197  @options.setter
198  def options(self, options: QuantizationDebugOptions) -> None:
199    self._debug_options = options
200    if not self.converter or not self.calibrated_model:
201      return
202    self._init_from_converter(self._debug_options, self.converter,
203                              self.calibrated_model,
204                              float_model=self.float_model)
205    self._initialize_stats()
206
207  def _initialize_stats(self):
208    """Helper function initializes stats."""
209    # TODO(b/177749613) : Fix the dependency on tf.lite._get_ops_details()
210    # Following code is needed to get op's name from the output tensor index,
211    # since NumericVerify op only provides its quantized input tensor index.
212    self._defining_op = dict()
213    for op_info in self._quant_interpreter._get_ops_details():  # pylint: disable=protected-access
214      self._defining_op.update(
215          {tensor_idx: op_info['index'] for tensor_idx in op_info['outputs']})
216
217    self._numeric_verify_tensor_details = None
218    self._numeric_verify_op_details = None
219    if not self._get_numeric_verify_tensor_details():
220      raise ValueError('Please check if the quantized model is in debug mode')
221
222    self._layer_debug_metrics = _DEFAULT_LAYER_DEBUG_METRICS.copy()
223    if self._debug_options.layer_debug_metrics:
224      self._layer_debug_metrics.update(self._debug_options.layer_debug_metrics)
225
226    self.layer_statistics = None
227    self.model_statistics = None
228
229    self._metrics = metrics_stub.TFLiteMetrics()
230    self._metrics.increase_counter_debugger_creation()
231
232  def _get_quantized_model(self, is_debug: bool) -> bytes:
233    if not self.converter:
234      raise ValueError('No converter found, use this function with the '
235                       'converter option in the constructor.')
236
237    return convert.mlir_quantize(
238        self.calibrated_model,
239        disable_per_channel=self.converter._experimental_disable_per_channel,  # pylint: disable=protected-access
240        fully_quantize=self._debug_options.fully_quantize,
241        enable_numeric_verify=is_debug,
242        denylisted_ops=self._debug_options.denylisted_ops,
243        denylisted_nodes=self._debug_options.denylisted_nodes)
244
245  def get_nondebug_quantized_model(self) -> bytes:
246    """Returns a non-instrumented quantized model.
247
248    Convert the quantized model with the initialized converter and
249    return bytes for nondebug model. The model will not be instrumented with
250    numeric verification operations.
251
252    Returns:
253      Model bytes corresponding to the model.
254    Raises:
255      ValueError: if converter is not passed to the debugger.
256    """
257    return self._get_quantized_model(is_debug=False)
258
259  def get_debug_quantized_model(self) -> bytes:
260    """Returns an instrumented quantized model.
261
262    Convert the quantized model with the initialized converter and
263    return bytes for model. The model will be instrumented with numeric
264    verification operations and should only be used for debugging.
265
266    Returns:
267      Model bytes corresponding to the model.
268    Raises:
269      ValueError: if converter is not passed to the debugger.
270    """
271    return self._get_quantized_model(is_debug=True)
272
273  def _init_from_converter(self, options: QuantizationDebugOptions,
274                           converter: tf.lite.TFLiteConverter,
275                           calibrated_model: Optional[bytes] = None,
276                           float_model: Optional[bytes] = None) -> None:
277    """Convert the model and apply options.
278
279    Converts the quantized model and initializes a quantized model interpreter
280    with the quantized model. Returns a float model interpreter if float model
281    is provided.
282
283    Args:
284      options: a QuantizationDebugOptions object.
285      converter: an initialized tf.lite.TFLiteConverter.
286      calibrated_model: Calibrated model bytes.
287      float_model: Float model bytes.
288    """
289    self.quant_model = convert.mlir_quantize(
290        calibrated_model,
291        disable_per_channel=converter._experimental_disable_per_channel,  # pylint: disable=protected-access
292        fully_quantize=options.fully_quantize,
293        enable_numeric_verify=True,
294        denylisted_ops=options.denylisted_ops,
295        denylisted_nodes=options.denylisted_nodes)
296    self._quant_interpreter = tf.lite.Interpreter(
297        model_content=self.quant_model)
298    self._float_interpreter = None
299    if float_model is not None:
300      self._float_interpreter = tf.lite.Interpreter(model_content=float_model)
301
302  def _set_converter_options_for_float(
303      self, converter: tf.lite.TFLiteConverter) -> tf.lite.TFLiteConverter:
304    """Verify converter options and set required experimental options."""
305    if converter.optimizations:
306      converter.optimizations = []
307    return converter
308
309  def _set_converter_options_for_calibration(
310      self, converter: tf.lite.TFLiteConverter) -> tf.lite.TFLiteConverter:
311    """Verify converter options and set required experimental options."""
312    if not converter.optimizations:
313      converter.optimizations = [tf.lite.Optimize.DEFAULT]
314    if not converter.representative_dataset:
315      raise ValueError('converter object must set representative_dataset')
316
317    converter.experimental_mlir_quantizer = True
318    converter._experimental_calibrate_only = True  # pylint: disable=protected-access
319    return converter
320
321  def run(self) -> None:
322    """Runs models and gets metrics."""
323    self.layer_statistics = self._collect_layer_statistics()
324    if self._debug_options.model_debug_metrics:
325      self.model_statistics = self._collect_model_statistics()
326
327  def _collect_layer_statistics(self) -> Dict[str, Dict[str, float]]:
328    """Collects layer statistics by applying layer debug metrics.
329
330    For all data from the given RepresentativeDataset, collect statistics per
331    example by getting the NumericVerify op results in _quant_interpreter
332    and calculating layer debug metrics on the results.
333
334    Returns:
335      aggregated per-layer statistics of NumericVerify results.
336      {layer_name: {metric_name: metric}}
337    """
338    layer_statistics = collections.defaultdict(
339        lambda: collections.defaultdict(list))
340
341    initialize = True
342    for tensor_data in self._data_gen():
343      self._set_input_tensors(self._quant_interpreter, tensor_data, initialize)
344      initialize = False
345
346      # Run the model.
347      self._quant_interpreter.invoke()
348
349      # Collect the statistics of this invoke result.
350      for tensor_detail in self._get_numeric_verify_tensor_details():
351        tensor_name = tensor_detail['name']
352        diffs = self._quant_interpreter.get_tensor(tensor_detail['index'])
353        for metric_name, metric_fn in self._layer_debug_metrics.items():
354          layer_statistics[tensor_name][metric_name].append(metric_fn(diffs))
355
356      if self._debug_options.layer_direct_compare_metrics is not None:
357        for tensor_detail in self._get_numeric_verify_tensor_details():
358          tensor_name = tensor_detail['name']
359          op_idx = self._defining_op[tensor_detail['index']]
360          op_detail = self._quant_interpreter._get_op_details(op_idx)  # pylint: disable=protected-access
361          q_idx, f_idx = op_detail['inputs']
362          quant_input_detail = self._quant_interpreter._get_tensor_details(  # pylint: disable=protected-access
363              q_idx)
364          for (metric_name, metric_fn
365              ) in self._debug_options.layer_direct_compare_metrics.items():
366            layer_statistics[tensor_name][metric_name].append(
367                metric_fn(
368                    self._quant_interpreter.get_tensor(f_idx),
369                    self._quant_interpreter.get_tensor(q_idx),
370                    quant_input_detail['quantization_parameters']['scales'][0],
371                    quant_input_detail['quantization_parameters']['zero_points']
372                    [0]))
373
374    # Calculate final aggregated metrics for each layer.
375    for metrics in layer_statistics.values():
376      for metric_name in metrics:
377        metrics[metric_name] = np.nanmean(metrics[metric_name])
378
379    return layer_statistics
380
381  def _collect_model_statistics(self) -> Dict[str, float]:
382    """Collects model output metrics.
383
384    For all data from the given RepresentativeDataset, collect all model output
385    results from float model & quantized debug model, and calculate metrics
386    by using model output functions. As a result, self.model_results is filled,
387
388    where self.model_results[model_output_function_name] = `aggregated model
389    output function value` (a scalar).
390
391    Returns:
392      aggregated per-model output discrepancy metrics.
393      {metric_name: aggregated_metric}
394    """
395
396    model_statistics = collections.defaultdict(list)
397
398    initialize = True
399    for tensor_data in self._data_gen():
400      self._set_input_tensors(self._quant_interpreter, tensor_data, initialize)
401      self._set_input_tensors(self._float_interpreter, tensor_data, initialize)
402      initialize = False
403
404      # Run the models.
405      self._quant_interpreter.invoke()
406      self._float_interpreter.invoke()
407
408      # Collect the output results from both models.
409      float_tensor_data = self._get_output_tensors(self._float_interpreter)
410      quant_tensor_data = self._get_output_tensors(self._quant_interpreter)
411
412      # Calculate the metrics.
413      for (metric_name,
414           metric_fn) in self._debug_options.model_debug_metrics.items():
415        model_statistics[metric_name].append(
416            metric_fn(float_tensor_data, quant_tensor_data))
417
418    # Calculate final aggregated metrics for each outputs.
419    return {
420        metric_name: np.mean(metric)
421        for metric_name, metric in model_statistics.items()
422    }
423
424  def _set_input_tensors(self, interpreter: tf.lite.Interpreter,
425                         tensor_data: Sequence[np.ndarray],
426                         initialize: bool) -> None:
427    """Sets input tensors into TFLite model Interpreter.
428
429    Args:
430      interpreter: a tf.lite.Interpreter object with allocated tensors.
431      tensor_data: a list of Numpy array data.
432      initialize: set to true when input is first set for the interpreter, to
433        set input shapes and allocate tensors.
434
435    Raises:
436      ValueError: when inputs can't be set, or size of provided inputs does not
437      match size of model inputs.
438    """
439    input_details = interpreter.get_input_details()
440    if len(input_details) != len(tensor_data):
441      raise ValueError(
442          'Number of inputs provided ({}) does not match number of inputs to '
443          'the model ({})'.format(len(tensor_data), len(input_details)))
444
445    if initialize:
446      for input_detail, tensor in zip(input_details, tensor_data):
447        interpreter.resize_tensor_input(input_detail['index'], tensor.shape)
448      interpreter.allocate_tensors()
449
450    for input_detail, tensor in zip(input_details, tensor_data):
451      if tensor.dtype == np.float32 and input_detail['dtype'] == np.int8:
452        quant_params = _get_quant_params(input_detail)
453        if quant_params:
454          scale, zero_point = quant_params
455          tensor = np.round((tensor / scale) + zero_point).astype(np.int8)
456      interpreter.set_tensor(input_detail['index'], tensor)
457
458  def _get_output_tensors(self,
459                          interpreter: tf.lite.Interpreter) -> List[np.ndarray]:
460    """Returns output tensors of given TFLite model Interpreter.
461
462    Args:
463      interpreter: a tf.lite.Interpreter object with allocated tensors.
464
465    Returns:
466      a list of numpy arrays representing output tensor results.
467    """
468
469    outputs = []
470    for output_detail in interpreter.get_output_details():
471      tensor = interpreter.get_tensor(output_detail['index'])
472      if output_detail['dtype'] == np.int8:
473        quant_params = _get_quant_params(output_detail)
474        if quant_params:
475          scale, zero_point = quant_params
476          tensor = ((tensor.astype(np.float32) - zero_point) * scale).astype(
477              np.float32)
478      outputs.append(tensor)
479
480    return outputs
481
482  def _get_numeric_verify_tensor_details(self) -> List[str]:
483    """Returns all names of all tensors from NumericVerify op."""
484    # pylint: disable=protected-access
485    if not self._numeric_verify_tensor_details:
486      self._numeric_verify_tensor_details = []
487      self._numeric_verify_op_details = {}
488      for op_info in self._quant_interpreter._get_ops_details():
489        if op_info['op_name'] == _NUMERIC_VERIFY_OP_NAME:
490          self._numeric_verify_tensor_details.append(
491              self._quant_interpreter._get_tensor_details(
492                  op_info['outputs'][0]))
493          tensor_name = self._numeric_verify_tensor_details[-1]['name']
494          self._numeric_verify_op_details[tensor_name] = op_info
495    # pylint: enable=protected-access
496    return self._numeric_verify_tensor_details
497
498  def _get_operand_name_and_index(
499      self, numeric_verify_name: str) -> Tuple[str, int]:
500    """Gets the index and name of NumericVerify Op's quantized input tensor.
501
502    Args:
503      numeric_verify_name: name of the NumericVerify op's output tensor. It has
504        format of `NumericVerify/{quantized_tensor_name}:{quantized_tensor_idx}`
505
506    Returns:
507      Tuple of (tensor_name, tensor_idx) for quantized op's output tensor.
508    """
509    tensor_name, tensor_idx = numeric_verify_name.rsplit(':', 1)
510    float_tensor_name = tensor_name[len(_NUMERIC_VERIFY_OP_NAME) + 1:]
511    if re.match(r'\d', float_tensor_name[-1]):
512      float_tensor_name = float_tensor_name[:-1]
513
514    return (float_tensor_name, int(tensor_idx))
515
516  def layer_statistics_dump(self, file: IO[str]) -> None:
517    """Dumps layer statistics into file, in csv format.
518
519    Args:
520      file: file, or file-like object to write.
521    """
522    # order of `fields` is the order of fields in csv.
523    fields = ['op_name', 'tensor_idx'] + list(self._layer_debug_metrics.keys())
524    if self._debug_options.layer_direct_compare_metrics is not None:
525      fields += list(self._debug_options.layer_direct_compare_metrics.keys())
526    fields += ['scale', 'zero_point', 'tensor_name']
527    writer = csv.DictWriter(file, fields)
528    writer.writeheader()
529    for name, metrics in self.layer_statistics.items():
530      data = metrics.copy()
531      (data['tensor_name'], _) = self._get_operand_name_and_index(name)
532      data['tensor_idx'] = self._numeric_verify_op_details[name]['inputs'][0]
533      data['op_name'] = self._quant_interpreter._get_op_details(  # pylint: disable=protected-access
534          self._defining_op[data['tensor_idx']])['op_name']
535      details = self._quant_interpreter._get_tensor_details(data['tensor_idx'])  # pylint: disable=protected-access
536      data['scale'], data['zero_point'] = (
537          details['quantization_parameters']['scales'][0],
538          details['quantization_parameters']['zero_points'][0])
539      writer.writerow(data)
540