1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ======================================================================== 15"""Tensor Tracer report generation utilities.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import collections 22import os 23 24from tensorflow.python.platform import gfile 25from tensorflow.python.platform import tf_logging as logging 26from tensorflow.python.tpu import tensor_tracer_pb2 27 28_TRACER_LOG_PREFIX = ' [>>>TT>>>]' 29_MARKER_SECTION_BEGIN = '!!!!!!! section-begin:' 30_MARKER_SECTION_END = '!!!!!!! section-end:' 31 32_SECTION_NAME_CONFIG = 'configuration' 33_SECTION_NAME_REASON = 'reason' 34_SECTION_NAME_OP_LIST = 'op-list' 35_SECTION_NAME_TENSOR_LIST = 'tensor-list' 36_SECTION_NAME_CACHE_INDEX_MAP = 'cache-index-map' 37_SECTION_NAME_GRAPH = 'graph' 38_SECTION_NAME_TENSOR_TRACER_CHECKPOINT = 'tensor_tracer_checkpoint' 39 40_FIELD_NAME_VERSION = 'version:' 41_FIELD_NAME_DEVICE = 'device:' 42_FIELD_NAME_TRACE_MODE = 'trace-mode:' 43_FIELD_NAME_SUBMODE = 'submode:' 44_FIELD_NAME_NUM_REPLICAS = 'num-replicas:' 45_FIELD_NAME_NUM_REPLICAS_PER_HOST = 'num-replicas-per-host:' 46_FIELD_NAME_NUM_HOSTS = 'num-hosts:' 47_FIELD_NAME_NUM_OPS = 'number-of-ops:' 48_FIELD_NAME_NUM_TENSORS = 'number-of-tensors:' 49_FIELD_NAME_NUM_CACHE_INDICES = 'number-of-indices:' 50_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED = 'topological-sort-succeed:' 51 52_CURRENT_VERSION = 'use-outside-compilation' 53_TT_REPORT_PROTO = 'tensor_tracer_report.report_pb' 54 55 56def topological_sort(g): 57 """Performs topological sort on the given graph. 58 59 Args: 60 g: the graph. 61 62 Returns: 63 A pair where the first element indicates if the topological 64 sort succeeded (True if there is no cycle found; False if a 65 cycle is found) and the second element is either the sorted 66 list of nodes or the cycle of nodes found. 67 """ 68 def _is_loop_edge(op): 69 """Returns true if the op is the end of a while-loop creating a cycle.""" 70 return op.type in ['NextIteration'] 71 72 def _in_op_degree(op): 73 """Returns the number of incoming edges to the given op. 74 75 The edge calculation skips the edges that come from 'NextIteration' ops. 76 NextIteration creates a cycle in the graph. We break cycles by treating 77 this op as 'sink' and ignoring all outgoing edges from it. 78 Args: 79 op: Tf.Operation 80 Returns: 81 the number of incoming edges. 82 """ 83 count = 0 84 for op in op.control_inputs + [in_tensor.op for in_tensor in op.inputs]: 85 if not _is_loop_edge(op): 86 count += 1 87 return count 88 89 sorted_ops = [] 90 op_in_degree = {op: _in_op_degree(op) for op in g.get_operations()} 91 92 frontier = [op for (op, degree) in op_in_degree.items() if degree == 0] 93 frontier.sort(key=lambda op: op.name) 94 while frontier: 95 op = frontier.pop() 96 # Remove the op from graph, and remove its outgoing edges. 97 sorted_ops.append(op) 98 if _is_loop_edge(op): 99 continue 100 # pylint: disable=protected-access 101 consumers = list(op._control_outputs) 102 # pylint: enable=protected-access 103 for out_tensor in op.outputs: 104 consumers += [consumer_op for consumer_op in out_tensor.consumers()] 105 consumers.sort(key=lambda op: op.name) 106 for consumer in consumers: 107 # For each deleted edge shift the bucket of the vertex. 108 op_in_degree[consumer] -= 1 109 if op_in_degree[consumer] == 0: 110 frontier.append(consumer) 111 if op_in_degree[consumer] < 0: 112 raise ValueError('consumer:%s degree mismatch'%consumer.name) 113 114 left_ops = set(op for (op, degree) in op_in_degree.items() if degree > 0) 115 if left_ops: 116 return (True, left_ops) 117 else: 118 assert len(g.get_operations()) == len(sorted_ops) 119 return (False, sorted_ops) 120 121 122class TensorTracerConfig(object): 123 """Tensor Tracer config object.""" 124 125 def __init__(self): 126 self.version = _CURRENT_VERSION 127 self.device_type = None 128 self.num_replicas = None 129 self.num_replicas_per_host = None 130 self.num_hosts = None 131 132 133class TensorTraceOrder(object): 134 """Class that is responsible from storing the trace-id of the tensors.""" 135 136 def __init__(self, graph_order, traced_tensors): 137 self.graph_order = graph_order 138 self.traced_tensors = traced_tensors 139 self._create_tensor_maps() 140 141 def _create_tensor_maps(self): 142 """Creates tensor to cache id maps.""" 143 self.tensorname_to_cache_idx = {} 144 self.cache_idx_to_tensor_idx = [] 145 for out_tensor in self.traced_tensors: 146 tensor_name = out_tensor.name 147 if tensor_name in self.tensorname_to_cache_idx: 148 raise ValueError( 149 'Tensor name %s should not be already in ' 150 'tensorname_to_cache_idx'%tensor_name) 151 if tensor_name not in self.graph_order.tensor_to_idx: 152 raise ValueError( 153 'Tensor name %s is not in the tensor_to_idx'%tensor_name) 154 tensor_idx = self.graph_order.tensor_to_idx[tensor_name] 155 cache_idx = len(self.tensorname_to_cache_idx) 156 self.tensorname_to_cache_idx[tensor_name] = cache_idx 157 self.cache_idx_to_tensor_idx.append(tensor_idx) 158 if len(self.tensorname_to_cache_idx) != len( 159 self.cache_idx_to_tensor_idx): 160 raise RuntimeError('len(self.tensorname_to_cache_idx) != ' 161 'len(self.cache_idx_to_tensor_idx') 162 163 164def sort_tensors_and_ops(graph): 165 """Returns a wrapper that has consistent tensor and op orders.""" 166 graph_wrapper = collections.namedtuple('GraphWrapper', 167 ['graph', 'operations', 'op_to_idx', 168 'tensors', 'tensor_to_idx', 169 'contains_cycle', 170 'topological_order_or_cycle']) 171 contains_cycle, topological_order_or_cycle = topological_sort(graph) 172 if not contains_cycle: 173 operations = topological_order_or_cycle 174 else: 175 operations = graph.get_operations() 176 op_to_idx = {op.name: index for index, op 177 in enumerate(operations)} 178 tensors = [] 179 for op in operations: 180 tensors.extend(op.outputs) 181 tensor_to_idx = {tensor.name: index for index, tensor in 182 enumerate(tensors)} 183 return graph_wrapper(graph=graph, operations=operations, op_to_idx=op_to_idx, 184 tensors=tensors, tensor_to_idx=tensor_to_idx, 185 contains_cycle=contains_cycle, 186 topological_order_or_cycle=topological_order_or_cycle) 187 188 189class OpenReportFile(object): 190 """Context manager for writing report file.""" 191 192 def __init__(self, tt_parameters): 193 if not tt_parameters.report_file_path: 194 self._report_file = None 195 return 196 try: 197 self._report_file = gfile.Open(tt_parameters.report_file_path, 'w') 198 except IOError as e: 199 raise e 200 201 def __enter__(self): 202 return self._report_file 203 204 def __exit__(self, unused_type, unused_value, unused_traceback): 205 if self._report_file: 206 self._report_file.close() 207 208 209class TTReportHandle(object): 210 """Utility class responsible from creating a tensor tracer report.""" 211 212 def __init__(self): 213 self.instrument_records = {} 214 self._report_file = None 215 216 def instrument(self, name, explanation): 217 self.instrument_records[name] = explanation 218 219 def instrument_op(self, op, explanation): 220 self.instrument(op.name, explanation) 221 222 def instrument_tensor(self, tensor, explanation): 223 self.instrument(tensor.name, explanation) 224 225 def create_report_proto(self, tt_config, tt_parameters, tensor_trace_order, 226 tensor_trace_points, collected_signature_types): 227 """Creates and returns a proto that stores tensor tracer configuration. 228 229 Args: 230 tt_config: TensorTracerConfig object holding information about the run 231 environment (device, # cores, # hosts), and tensor tracer version 232 information. 233 tt_parameters: TTParameters objects storing the user provided parameters 234 for tensor tracer. 235 tensor_trace_order: TensorTraceOrder object storing a topological order of 236 the graph. 237 tensor_trace_points: Progromatically added trace_points/checkpoints. 238 collected_signature_types: The signature types collected, e,g, norm, 239 max, min, mean... 240 Returns: 241 TensorTracerReport proto. 242 """ 243 report = tensor_tracer_pb2.TensorTracerReport() 244 report.config.version = tt_config.version 245 report.config.device = tt_config.device_type 246 report.config.num_cores = tt_config.num_replicas 247 report.config.num_hosts = tt_config.num_hosts 248 report.config.num_cores_per_host = tt_config.num_replicas_per_host 249 for core in tt_parameters.included_cores: 250 report.config.included_cores.append(core) 251 report.config.submode = tt_parameters.submode 252 report.config.trace_mode = tt_parameters.trace_mode 253 254 for signature_name, _ in sorted(collected_signature_types.items(), 255 key=lambda x: x[1]): 256 report.config.signatures.append(signature_name) 257 258 tf_graph = tensor_trace_order.graph_order.graph 259 report.graphdef.CopyFrom(tf_graph.as_graph_def()) 260 for tensor in tensor_trace_order.graph_order.tensors: 261 tensor_def = tensor_tracer_pb2.TensorTracerReport.TracedTensorDef() 262 tensor_def.name = tensor.name 263 if tensor.name in tensor_trace_order.tensorname_to_cache_idx: 264 tensor_def.is_traced = True 265 tensor_def.cache_index = ( 266 tensor_trace_order.tensorname_to_cache_idx[tensor.name]) 267 else: 268 tensor_def.is_traced = False 269 270 if tensor.name in tensor_trace_points: 271 tensor_def.trace_point_name = tensor_trace_points[tensor.name] 272 if tensor.name in self.instrument_records: 273 tensor_def.explanation = self.instrument_records[tensor.name] 274 elif tensor.op.name in self.instrument_records: 275 tensor_def.explanation = self.instrument_records[tensor.op.name] 276 report.tensordef[tensor.name].CopyFrom(tensor_def) 277 return report 278 279 def write_report_proto(self, report_proto, tt_parameters): 280 """Writes the given report proto under trace_dir.""" 281 gfile.MakeDirs(tt_parameters.trace_dir) 282 report_path = os.path.join(tt_parameters.trace_dir, _TT_REPORT_PROTO) 283 with gfile.GFile(report_path, 'wb') as f: 284 f.write(report_proto.SerializeToString()) 285 286 def create_report(self, tt_config, tt_parameters, 287 tensor_trace_order, tensor_trace_points): 288 """Creates a report file and writes the trace information.""" 289 with OpenReportFile(tt_parameters) as self._report_file: 290 self._write_config_section(tt_config, tt_parameters) 291 self._write_op_list_section(tensor_trace_order.graph_order) 292 self._write_tensor_list_section(tensor_trace_order.graph_order) 293 self._write_trace_points(tensor_trace_points) 294 self._write_cache_index_map_section(tensor_trace_order) 295 self._write_reason_section() 296 self._write_graph_section(tensor_trace_order.graph_order) 297 298 def _write_trace_points(self, tensor_trace_points): 299 """Writes the list of checkpoints.""" 300 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, 301 _SECTION_NAME_TENSOR_TRACER_CHECKPOINT)) 302 for (tensor, checkpoint_name) in tensor_trace_points: 303 self._write_report('%s %s\n'%(tensor.name, checkpoint_name)) 304 self._write_report('%s %s\n'%(_MARKER_SECTION_END, 305 _SECTION_NAME_TENSOR_TRACER_CHECKPOINT)) 306 307 def _write_report(self, content): 308 """Writes the given content to the report.""" 309 310 line = '%s %s'%(_TRACER_LOG_PREFIX, content) 311 if self._report_file: 312 self._report_file.write(line) 313 else: 314 logging.info(line) 315 316 def _write_config_section(self, tt_config, tt_parameters): 317 """Writes the config section of the report.""" 318 319 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_CONFIG)) 320 self._write_report('%s %s\n'%(_FIELD_NAME_VERSION, tt_config.version)) 321 self._write_report('%s %s\n'%(_FIELD_NAME_DEVICE, tt_config.device_type)) 322 self._write_report('%s %s\n'%(_FIELD_NAME_TRACE_MODE, 323 tt_parameters.trace_mode)) 324 self._write_report('%s %s\n'%(_FIELD_NAME_SUBMODE, 325 tt_parameters.submode)) 326 if tt_parameters.included_cores: 327 self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS, 328 len(tt_parameters.included_cores))) 329 else: 330 self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS, 331 tt_config.num_replicas)) 332 self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS_PER_HOST, 333 tt_config.num_replicas_per_host)) 334 self._write_report('%s %s\n'%(_FIELD_NAME_NUM_HOSTS, tt_config.num_hosts)) 335 self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_CONFIG)) 336 337 def _write_reason_section(self): 338 """Writes the reason section of the report.""" 339 340 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_REASON)) 341 for key in sorted(self.instrument_records): 342 self._write_report('"%s" %s\n'%(key, self.instrument_records[key])) 343 self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_REASON)) 344 345 def _write_op_list_section(self, graph_order): 346 """Writes the Op-list section of the report.""" 347 348 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_OP_LIST)) 349 self._write_report('%s %d\n'%(_FIELD_NAME_NUM_OPS, 350 len(graph_order.operations))) 351 for i in range(0, len(graph_order.operations)): 352 op = graph_order.operations[i] 353 line = '%d "%s" %s'%(i, op.name, op.type) 354 for out_tensor in op.outputs: 355 if out_tensor.name not in graph_order.tensor_to_idx: 356 raise ValueError( 357 'out_tensor %s is not in tensor_to_idx'%out_tensor.name) 358 line += ' %d'%graph_order.tensor_to_idx[out_tensor.name] 359 line += '\n' 360 self._write_report(line) 361 self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_OP_LIST)) 362 363 def _write_tensor_list_section(self, graph_order): 364 """Writes the tensor-list section of the report.""" 365 366 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, 367 _SECTION_NAME_TENSOR_LIST)) 368 self._write_report('%s %d\n'%(_FIELD_NAME_NUM_TENSORS, 369 len(graph_order.tensors))) 370 for i in range(0, len(graph_order.tensors)): 371 tensor = graph_order.tensors[i] 372 line = '%d "%s"'%(i, tensor.name) 373 consumers = tensor.consumers() 374 consumers.sort(key=lambda op: op.name) 375 for consumer_op in consumers: 376 if consumer_op.name not in graph_order.op_to_idx: 377 raise ValueError( 378 'consumer_op %s is not in op_to_idx'%consumer_op.name) 379 line += ' %d'%graph_order.op_to_idx[consumer_op.name] 380 line += '\n' 381 self._write_report(line) 382 self._write_report('%s %s\n'%(_MARKER_SECTION_END, 383 _SECTION_NAME_TENSOR_LIST)) 384 385 def _write_cache_index_map_section(self, tensor_trace_order): 386 """Writes the mapping from cache index to tensor index to the report.""" 387 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, 388 _SECTION_NAME_CACHE_INDEX_MAP)) 389 self._write_report('%s %d\n'%( 390 _FIELD_NAME_NUM_CACHE_INDICES, 391 len(tensor_trace_order.cache_idx_to_tensor_idx))) 392 for cache_idx in range(0, len(tensor_trace_order.cache_idx_to_tensor_idx)): 393 tensor_idx = tensor_trace_order.cache_idx_to_tensor_idx[cache_idx] 394 line = '%d %d\n'%(cache_idx, tensor_idx) 395 self._write_report(line) 396 self._write_report('%s %s\n'%(_MARKER_SECTION_END, 397 _SECTION_NAME_CACHE_INDEX_MAP)) 398 399 def _write_graph_section(self, graph_order): 400 """Writes the graph section of the report.""" 401 402 self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_GRAPH)) 403 self._write_report('%s %s\n'%(_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED, 404 not graph_order.contains_cycle)) 405 l = list(graph_order.topological_order_or_cycle) 406 for i in range(0, len(l)): 407 self._write_report('%d "%s"\n'%(i, l[i].name)) 408 self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_GRAPH)) 409