• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ========================================================================
15"""Tensor Tracer report generation utilities."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import collections
22import os
23
24from tensorflow.python.platform import gfile
25from tensorflow.python.platform import tf_logging as logging
26from tensorflow.python.tpu import tensor_tracer_pb2
27
28_TRACER_LOG_PREFIX = ' [>>>TT>>>]'
29_MARKER_SECTION_BEGIN = '!!!!!!! section-begin:'
30_MARKER_SECTION_END = '!!!!!!! section-end:'
31
32_SECTION_NAME_CONFIG = 'configuration'
33_SECTION_NAME_REASON = 'reason'
34_SECTION_NAME_OP_LIST = 'op-list'
35_SECTION_NAME_TENSOR_LIST = 'tensor-list'
36_SECTION_NAME_CACHE_INDEX_MAP = 'cache-index-map'
37_SECTION_NAME_GRAPH = 'graph'
38_SECTION_NAME_TENSOR_TRACER_CHECKPOINT = 'tensor_tracer_checkpoint'
39
40_FIELD_NAME_VERSION = 'version:'
41_FIELD_NAME_DEVICE = 'device:'
42_FIELD_NAME_TRACE_MODE = 'trace-mode:'
43_FIELD_NAME_SUBMODE = 'submode:'
44_FIELD_NAME_NUM_REPLICAS = 'num-replicas:'
45_FIELD_NAME_NUM_REPLICAS_PER_HOST = 'num-replicas-per-host:'
46_FIELD_NAME_NUM_HOSTS = 'num-hosts:'
47_FIELD_NAME_NUM_OPS = 'number-of-ops:'
48_FIELD_NAME_NUM_TENSORS = 'number-of-tensors:'
49_FIELD_NAME_NUM_CACHE_INDICES = 'number-of-indices:'
50_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED = 'topological-sort-succeed:'
51
52_CURRENT_VERSION = 'use-outside-compilation'
53_TT_REPORT_PROTO = 'tensor_tracer_report.report_pb'
54
55
56def topological_sort(g):
57  """Performs topological sort on the given graph.
58
59  Args:
60     g: the graph.
61
62  Returns:
63     A pair where the first element indicates if the topological
64     sort succeeded (True if there is no cycle found; False if a
65     cycle is found) and the second element is either the sorted
66     list of nodes or the cycle of nodes found.
67  """
68  def _is_loop_edge(op):
69    """Returns true if the op is the end of a while-loop creating a cycle."""
70    return op.type in ['NextIteration']
71
72  def _in_op_degree(op):
73    """Returns the number of incoming edges to the given op.
74
75    The edge calculation skips the edges that come from 'NextIteration' ops.
76    NextIteration creates a cycle in the graph. We break cycles by treating
77    this op as 'sink' and ignoring all outgoing edges from it.
78    Args:
79      op: Tf.Operation
80    Returns:
81      the number of incoming edges.
82    """
83    count = 0
84    for op in op.control_inputs + [in_tensor.op for in_tensor in op.inputs]:
85      if not _is_loop_edge(op):
86        count += 1
87    return count
88
89  sorted_ops = []
90  op_in_degree = {op: _in_op_degree(op) for op in g.get_operations()}
91
92  frontier = [op for (op, degree) in op_in_degree.items() if degree == 0]
93  frontier.sort(key=lambda op: op.name)
94  while frontier:
95    op = frontier.pop()
96    # Remove the op from graph, and remove its outgoing edges.
97    sorted_ops.append(op)
98    if _is_loop_edge(op):
99      continue
100    # pylint: disable=protected-access
101    consumers = list(op._control_outputs)
102    # pylint: enable=protected-access
103    for out_tensor in op.outputs:
104      consumers += [consumer_op for consumer_op in out_tensor.consumers()]
105    consumers.sort(key=lambda op: op.name)
106    for consumer in consumers:
107      # For each deleted edge shift the bucket of the vertex.
108      op_in_degree[consumer] -= 1
109      if op_in_degree[consumer] == 0:
110        frontier.append(consumer)
111      if op_in_degree[consumer] < 0:
112        raise ValueError('consumer:%s degree mismatch'%consumer.name)
113
114  left_ops = set(op for (op, degree) in op_in_degree.items() if degree > 0)
115  if left_ops:
116    return (True, left_ops)
117  else:
118    assert len(g.get_operations()) == len(sorted_ops)
119    return (False, sorted_ops)
120
121
122class TensorTracerConfig(object):
123  """Tensor Tracer config object."""
124
125  def __init__(self):
126    self.version = _CURRENT_VERSION
127    self.device_type = None
128    self.num_replicas = None
129    self.num_replicas_per_host = None
130    self.num_hosts = None
131
132
133class TensorTraceOrder(object):
134  """Class that is responsible from storing the trace-id of the tensors."""
135
136  def __init__(self, graph_order, traced_tensors):
137    self.graph_order = graph_order
138    self.traced_tensors = traced_tensors
139    self._create_tensor_maps()
140
141  def _create_tensor_maps(self):
142    """Creates tensor to cache id maps."""
143    self.tensorname_to_cache_idx = {}
144    self.cache_idx_to_tensor_idx = []
145    for out_tensor in self.traced_tensors:
146      tensor_name = out_tensor.name
147      if tensor_name in self.tensorname_to_cache_idx:
148        raise ValueError(
149            'Tensor name %s should not be already in '
150            'tensorname_to_cache_idx'%tensor_name)
151      if tensor_name not in self.graph_order.tensor_to_idx:
152        raise ValueError(
153            'Tensor name %s is not in the tensor_to_idx'%tensor_name)
154      tensor_idx = self.graph_order.tensor_to_idx[tensor_name]
155      cache_idx = len(self.tensorname_to_cache_idx)
156      self.tensorname_to_cache_idx[tensor_name] = cache_idx
157      self.cache_idx_to_tensor_idx.append(tensor_idx)
158      if len(self.tensorname_to_cache_idx) != len(
159          self.cache_idx_to_tensor_idx):
160        raise RuntimeError('len(self.tensorname_to_cache_idx) != '
161                           'len(self.cache_idx_to_tensor_idx')
162
163
164def sort_tensors_and_ops(graph):
165  """Returns a wrapper that has consistent tensor and op orders."""
166  graph_wrapper = collections.namedtuple('GraphWrapper',
167                                         ['graph', 'operations', 'op_to_idx',
168                                          'tensors', 'tensor_to_idx',
169                                          'contains_cycle',
170                                          'topological_order_or_cycle'])
171  contains_cycle, topological_order_or_cycle = topological_sort(graph)
172  if not contains_cycle:
173    operations = topological_order_or_cycle
174  else:
175    operations = graph.get_operations()
176  op_to_idx = {op.name: index for index, op
177               in enumerate(operations)}
178  tensors = []
179  for op in operations:
180    tensors.extend(op.outputs)
181  tensor_to_idx = {tensor.name: index for index, tensor in
182                   enumerate(tensors)}
183  return graph_wrapper(graph=graph, operations=operations, op_to_idx=op_to_idx,
184                       tensors=tensors, tensor_to_idx=tensor_to_idx,
185                       contains_cycle=contains_cycle,
186                       topological_order_or_cycle=topological_order_or_cycle)
187
188
189class OpenReportFile(object):
190  """Context manager for writing report file."""
191
192  def __init__(self, tt_parameters):
193    if not tt_parameters.report_file_path:
194      self._report_file = None
195      return
196    try:
197      self._report_file = gfile.Open(tt_parameters.report_file_path, 'w')
198    except IOError as e:
199      raise e
200
201  def __enter__(self):
202    return self._report_file
203
204  def __exit__(self, unused_type, unused_value, unused_traceback):
205    if self._report_file:
206      self._report_file.close()
207
208
209class TTReportHandle(object):
210  """Utility class responsible from creating a tensor tracer report."""
211
212  def __init__(self):
213    self.instrument_records = {}
214    self._report_file = None
215
216  def instrument(self, name, explanation):
217    self.instrument_records[name] = explanation
218
219  def instrument_op(self, op, explanation):
220    self.instrument(op.name, explanation)
221
222  def instrument_tensor(self, tensor, explanation):
223    self.instrument(tensor.name, explanation)
224
225  def create_report_proto(self, tt_config, tt_parameters, tensor_trace_order,
226                          tensor_trace_points, collected_signature_types):
227    """Creates and returns a proto that stores tensor tracer configuration.
228
229    Args:
230      tt_config: TensorTracerConfig object holding information about the run
231        environment (device, # cores, # hosts), and tensor tracer version
232        information.
233      tt_parameters: TTParameters objects storing the user provided parameters
234        for tensor tracer.
235      tensor_trace_order: TensorTraceOrder object storing a topological order of
236        the graph.
237      tensor_trace_points: Progromatically added trace_points/checkpoints.
238      collected_signature_types: The signature types collected, e,g, norm,
239        max, min, mean...
240    Returns:
241      TensorTracerReport proto.
242    """
243    report = tensor_tracer_pb2.TensorTracerReport()
244    report.config.version = tt_config.version
245    report.config.device = tt_config.device_type
246    report.config.num_cores = tt_config.num_replicas
247    report.config.num_hosts = tt_config.num_hosts
248    report.config.num_cores_per_host = tt_config.num_replicas_per_host
249    for core in tt_parameters.included_cores:
250      report.config.included_cores.append(core)
251    report.config.submode = tt_parameters.submode
252    report.config.trace_mode = tt_parameters.trace_mode
253
254    for signature_name, _ in sorted(collected_signature_types.items(),
255                                    key=lambda x: x[1]):
256      report.config.signatures.append(signature_name)
257
258    tf_graph = tensor_trace_order.graph_order.graph
259    report.graphdef.CopyFrom(tf_graph.as_graph_def())
260    for tensor in tensor_trace_order.graph_order.tensors:
261      tensor_def = tensor_tracer_pb2.TensorTracerReport.TracedTensorDef()
262      tensor_def.name = tensor.name
263      if tensor.name in tensor_trace_order.tensorname_to_cache_idx:
264        tensor_def.is_traced = True
265        tensor_def.cache_index = (
266            tensor_trace_order.tensorname_to_cache_idx[tensor.name])
267      else:
268        tensor_def.is_traced = False
269
270      if tensor.name in tensor_trace_points:
271        tensor_def.trace_point_name = tensor_trace_points[tensor.name]
272      if tensor.name in self.instrument_records:
273        tensor_def.explanation = self.instrument_records[tensor.name]
274      elif tensor.op.name in self.instrument_records:
275        tensor_def.explanation = self.instrument_records[tensor.op.name]
276      report.tensordef[tensor.name].CopyFrom(tensor_def)
277    return report
278
279  def write_report_proto(self, report_proto, tt_parameters):
280    """Writes the given report proto under trace_dir."""
281    gfile.MakeDirs(tt_parameters.trace_dir)
282    report_path = os.path.join(tt_parameters.trace_dir, _TT_REPORT_PROTO)
283    with gfile.GFile(report_path, 'wb') as f:
284      f.write(report_proto.SerializeToString())
285
286  def create_report(self, tt_config, tt_parameters,
287                    tensor_trace_order, tensor_trace_points):
288    """Creates a report file and writes the trace information."""
289    with OpenReportFile(tt_parameters) as self._report_file:
290      self._write_config_section(tt_config, tt_parameters)
291      self._write_op_list_section(tensor_trace_order.graph_order)
292      self._write_tensor_list_section(tensor_trace_order.graph_order)
293      self._write_trace_points(tensor_trace_points)
294      self._write_cache_index_map_section(tensor_trace_order)
295      self._write_reason_section()
296      self._write_graph_section(tensor_trace_order.graph_order)
297
298  def _write_trace_points(self, tensor_trace_points):
299    """Writes the list of checkpoints."""
300    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
301                                  _SECTION_NAME_TENSOR_TRACER_CHECKPOINT))
302    for (tensor, checkpoint_name) in tensor_trace_points:
303      self._write_report('%s %s\n'%(tensor.name, checkpoint_name))
304    self._write_report('%s %s\n'%(_MARKER_SECTION_END,
305                                  _SECTION_NAME_TENSOR_TRACER_CHECKPOINT))
306
307  def _write_report(self, content):
308    """Writes the given content to the report."""
309
310    line = '%s %s'%(_TRACER_LOG_PREFIX, content)
311    if self._report_file:
312      self._report_file.write(line)
313    else:
314      logging.info(line)
315
316  def _write_config_section(self, tt_config, tt_parameters):
317    """Writes the config section of the report."""
318
319    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_CONFIG))
320    self._write_report('%s %s\n'%(_FIELD_NAME_VERSION, tt_config.version))
321    self._write_report('%s %s\n'%(_FIELD_NAME_DEVICE, tt_config.device_type))
322    self._write_report('%s %s\n'%(_FIELD_NAME_TRACE_MODE,
323                                  tt_parameters.trace_mode))
324    self._write_report('%s %s\n'%(_FIELD_NAME_SUBMODE,
325                                  tt_parameters.submode))
326    if tt_parameters.included_cores:
327      self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS,
328                                    len(tt_parameters.included_cores)))
329    else:
330      self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS,
331                                    tt_config.num_replicas))
332    self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS_PER_HOST,
333                                  tt_config.num_replicas_per_host))
334    self._write_report('%s %s\n'%(_FIELD_NAME_NUM_HOSTS, tt_config.num_hosts))
335    self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_CONFIG))
336
337  def _write_reason_section(self):
338    """Writes the reason section of the report."""
339
340    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_REASON))
341    for key in sorted(self.instrument_records):
342      self._write_report('"%s" %s\n'%(key, self.instrument_records[key]))
343    self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_REASON))
344
345  def _write_op_list_section(self, graph_order):
346    """Writes the Op-list section of the report."""
347
348    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_OP_LIST))
349    self._write_report('%s %d\n'%(_FIELD_NAME_NUM_OPS,
350                                  len(graph_order.operations)))
351    for i in range(0, len(graph_order.operations)):
352      op = graph_order.operations[i]
353      line = '%d "%s" %s'%(i, op.name, op.type)
354      for out_tensor in op.outputs:
355        if out_tensor.name not in graph_order.tensor_to_idx:
356          raise ValueError(
357              'out_tensor %s is not in tensor_to_idx'%out_tensor.name)
358        line += ' %d'%graph_order.tensor_to_idx[out_tensor.name]
359      line += '\n'
360      self._write_report(line)
361    self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_OP_LIST))
362
363  def _write_tensor_list_section(self, graph_order):
364    """Writes the tensor-list section of the report."""
365
366    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
367                                  _SECTION_NAME_TENSOR_LIST))
368    self._write_report('%s %d\n'%(_FIELD_NAME_NUM_TENSORS,
369                                  len(graph_order.tensors)))
370    for i in range(0, len(graph_order.tensors)):
371      tensor = graph_order.tensors[i]
372      line = '%d "%s"'%(i, tensor.name)
373      consumers = tensor.consumers()
374      consumers.sort(key=lambda op: op.name)
375      for consumer_op in consumers:
376        if consumer_op.name not in graph_order.op_to_idx:
377          raise ValueError(
378              'consumer_op %s is not in op_to_idx'%consumer_op.name)
379        line += ' %d'%graph_order.op_to_idx[consumer_op.name]
380      line += '\n'
381      self._write_report(line)
382    self._write_report('%s %s\n'%(_MARKER_SECTION_END,
383                                  _SECTION_NAME_TENSOR_LIST))
384
385  def _write_cache_index_map_section(self, tensor_trace_order):
386    """Writes the mapping from cache index to tensor index to the report."""
387    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
388                                  _SECTION_NAME_CACHE_INDEX_MAP))
389    self._write_report('%s %d\n'%(
390        _FIELD_NAME_NUM_CACHE_INDICES,
391        len(tensor_trace_order.cache_idx_to_tensor_idx)))
392    for cache_idx in range(0, len(tensor_trace_order.cache_idx_to_tensor_idx)):
393      tensor_idx = tensor_trace_order.cache_idx_to_tensor_idx[cache_idx]
394      line = '%d %d\n'%(cache_idx, tensor_idx)
395      self._write_report(line)
396    self._write_report('%s %s\n'%(_MARKER_SECTION_END,
397                                  _SECTION_NAME_CACHE_INDEX_MAP))
398
399  def _write_graph_section(self, graph_order):
400    """Writes the graph section of the report."""
401
402    self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_GRAPH))
403    self._write_report('%s %s\n'%(_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED,
404                                  not graph_order.contains_cycle))
405    l = list(graph_order.topological_order_or_cycle)
406    for i in range(0, len(l)):
407      self._write_report('%d "%s"\n'%(i, l[i].name))
408    self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_GRAPH))
409