1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Timeline visualization for TensorFlow using Chrome Trace Format.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import collections 22import copy 23import json 24import re 25 26# The timeline target is usually imported as part of BUILD target 27# "platform_test", which includes also includes the "platform" 28# dependency. This is why the logging import here is okay. 29from tensorflow.python.platform import tf_logging as logging 30 31 32class AllocationMaximum(collections.namedtuple( 33 'AllocationMaximum', ('timestamp', 'num_bytes', 'tensors'))): 34 """Stores the maximum allocation for a given allocator within the timelne. 35 36 Parameters: 37 timestamp: `tensorflow::Env::NowMicros()` when this maximum was reached. 38 num_bytes: the total memory used at this time. 39 tensors: the set of tensors allocated at this time. 40 """ 41 pass 42 43 44class StepStatsAnalysis(collections.namedtuple( 45 'StepStatsAnalysis', ('chrome_trace', 'allocator_maximums'))): 46 """Stores the step stats analysis output. 47 48 Parameters: 49 chrome_trace: A dict containing the chrome trace analysis. 50 allocator_maximums: A dict mapping allocator names to AllocationMaximum. 51 """ 52 pass 53 54 55class _ChromeTraceFormatter(object): 56 """A helper class for generating traces in Chrome Trace Format.""" 57 58 def __init__(self, show_memory=False): 59 """Constructs a new Chrome Trace formatter.""" 60 self._show_memory = show_memory 61 self._events = [] 62 self._metadata = [] 63 64 def _create_event(self, ph, category, name, pid, tid, timestamp): 65 """Creates a new Chrome Trace event. 66 67 For details of the file format, see: 68 https://github.com/catapult-project/catapult/blob/master/tracing/README.md 69 70 Args: 71 ph: The type of event - usually a single character. 72 category: The event category as a string. 73 name: The event name as a string. 74 pid: Identifier of the process generating this event as an integer. 75 tid: Identifier of the thread generating this event as an integer. 76 timestamp: The timestamp of this event as a long integer. 77 78 Returns: 79 A JSON compatible event object. 80 """ 81 event = {} 82 event['ph'] = ph 83 event['cat'] = category 84 event['name'] = name 85 event['pid'] = pid 86 event['tid'] = tid 87 event['ts'] = timestamp 88 return event 89 90 def emit_pid(self, name, pid): 91 """Adds a process metadata event to the trace. 92 93 Args: 94 name: The process name as a string. 95 pid: Identifier of the process as an integer. 96 """ 97 event = {} 98 event['name'] = 'process_name' 99 event['ph'] = 'M' 100 event['pid'] = pid 101 event['args'] = {'name': name} 102 self._metadata.append(event) 103 104 def emit_tid(self, name, pid, tid): 105 """Adds a thread metadata event to the trace. 106 107 Args: 108 name: The thread name as a string. 109 pid: Identifier of the process as an integer. 110 tid: Identifier of the thread as an integer. 111 """ 112 event = {} 113 event['name'] = 'thread_name' 114 event['ph'] = 'M' 115 event['pid'] = pid 116 event['tid'] = tid 117 event['args'] = {'name': name} 118 self._metadata.append(event) 119 120 def emit_region(self, timestamp, duration, pid, tid, category, name, args): 121 """Adds a region event to the trace. 122 123 Args: 124 timestamp: The start timestamp of this region as a long integer. 125 duration: The duration of this region as a long integer. 126 pid: Identifier of the process generating this event as an integer. 127 tid: Identifier of the thread generating this event as an integer. 128 category: The event category as a string. 129 name: The event name as a string. 130 args: A JSON-compatible dictionary of event arguments. 131 """ 132 event = self._create_event('X', category, name, pid, tid, timestamp) 133 event['dur'] = duration 134 event['args'] = args 135 self._events.append(event) 136 137 def emit_obj_create(self, category, name, timestamp, pid, tid, object_id): 138 """Adds an object creation event to the trace. 139 140 Args: 141 category: The event category as a string. 142 name: The event name as a string. 143 timestamp: The timestamp of this event as a long integer. 144 pid: Identifier of the process generating this event as an integer. 145 tid: Identifier of the thread generating this event as an integer. 146 object_id: Identifier of the object as an integer. 147 """ 148 event = self._create_event('N', category, name, pid, tid, timestamp) 149 event['id'] = object_id 150 self._events.append(event) 151 152 def emit_obj_delete(self, category, name, timestamp, pid, tid, object_id): 153 """Adds an object deletion event to the trace. 154 155 Args: 156 category: The event category as a string. 157 name: The event name as a string. 158 timestamp: The timestamp of this event as a long integer. 159 pid: Identifier of the process generating this event as an integer. 160 tid: Identifier of the thread generating this event as an integer. 161 object_id: Identifier of the object as an integer. 162 """ 163 event = self._create_event('D', category, name, pid, tid, timestamp) 164 event['id'] = object_id 165 self._events.append(event) 166 167 def emit_obj_snapshot(self, category, name, timestamp, pid, tid, object_id, 168 snapshot): 169 """Adds an object snapshot event to the trace. 170 171 Args: 172 category: The event category as a string. 173 name: The event name as a string. 174 timestamp: The timestamp of this event as a long integer. 175 pid: Identifier of the process generating this event as an integer. 176 tid: Identifier of the thread generating this event as an integer. 177 object_id: Identifier of the object as an integer. 178 snapshot: A JSON-compatible representation of the object. 179 """ 180 event = self._create_event('O', category, name, pid, tid, timestamp) 181 event['id'] = object_id 182 event['args'] = {'snapshot': snapshot} 183 self._events.append(event) 184 185 def emit_flow_start(self, name, timestamp, pid, tid, flow_id): 186 """Adds a flow start event to the trace. 187 188 When matched with a flow end event (with the same 'flow_id') this will 189 cause the trace viewer to draw an arrow between the start and end events. 190 191 Args: 192 name: The event name as a string. 193 timestamp: The timestamp of this event as a long integer. 194 pid: Identifier of the process generating this event as an integer. 195 tid: Identifier of the thread generating this event as an integer. 196 flow_id: Identifier of the flow as an integer. 197 """ 198 event = self._create_event('s', 'DataFlow', name, pid, tid, timestamp) 199 event['id'] = flow_id 200 self._events.append(event) 201 202 def emit_flow_end(self, name, timestamp, pid, tid, flow_id): 203 """Adds a flow end event to the trace. 204 205 When matched with a flow start event (with the same 'flow_id') this will 206 cause the trace viewer to draw an arrow between the start and end events. 207 208 Args: 209 name: The event name as a string. 210 timestamp: The timestamp of this event as a long integer. 211 pid: Identifier of the process generating this event as an integer. 212 tid: Identifier of the thread generating this event as an integer. 213 flow_id: Identifier of the flow as an integer. 214 """ 215 event = self._create_event('t', 'DataFlow', name, pid, tid, timestamp) 216 event['id'] = flow_id 217 self._events.append(event) 218 219 def emit_counter(self, category, name, pid, timestamp, counter, value): 220 """Emits a record for a single counter. 221 222 Args: 223 category: The event category as a string. 224 name: The event name as a string. 225 pid: Identifier of the process generating this event as an integer. 226 timestamp: The timestamp of this event as a long integer. 227 counter: Name of the counter as a string. 228 value: Value of the counter as an integer. 229 """ 230 event = self._create_event('C', category, name, pid, 0, timestamp) 231 event['args'] = {counter: value} 232 self._events.append(event) 233 234 def emit_counters(self, category, name, pid, timestamp, counters): 235 """Emits a counter record for the dictionary 'counters'. 236 237 Args: 238 category: The event category as a string. 239 name: The event name as a string. 240 pid: Identifier of the process generating this event as an integer. 241 timestamp: The timestamp of this event as a long integer. 242 counters: Dictionary of counter values. 243 """ 244 event = self._create_event('C', category, name, pid, 0, timestamp) 245 event['args'] = counters.copy() 246 self._events.append(event) 247 248 def format_to_string(self, pretty=False): 249 """Formats the chrome trace to a string. 250 251 Args: 252 pretty: (Optional.) If True, produce human-readable JSON output. 253 254 Returns: 255 A JSON-formatted string in Chrome Trace format. 256 """ 257 trace = {} 258 trace['traceEvents'] = self._metadata + self._events 259 if pretty: 260 return json.dumps(trace, indent=4, separators=(',', ': ')) 261 else: 262 return json.dumps(trace, separators=(',', ':')) 263 264 265class _TensorTracker(object): 266 """An internal class to track the lifetime of a Tensor.""" 267 268 def __init__(self, name, object_id, timestamp, pid, allocator, num_bytes): 269 """Creates an object to track tensor references. 270 271 This class is not thread safe and is intended only for internal use by 272 the 'Timeline' class in this file. 273 274 Args: 275 name: The name of the Tensor as a string. 276 object_id: Chrome Trace object identifier assigned for this Tensor. 277 timestamp: The creation timestamp of this event as a long integer. 278 pid: Process identifier of the associated device, as an integer. 279 allocator: Name of the allocator used to create the Tensor. 280 num_bytes: Number of bytes allocated (long integer). 281 282 Returns: 283 A 'TensorTracker' object. 284 """ 285 self._name = name 286 self._pid = pid 287 self._object_id = object_id 288 self._create_time = timestamp 289 self._allocator = allocator 290 self._num_bytes = num_bytes 291 self._ref_times = [] 292 self._unref_times = [] 293 294 @property 295 def name(self): 296 """Name of this tensor.""" 297 return self._name 298 299 @property 300 def pid(self): 301 """ID of the process which created this tensor (an integer).""" 302 return self._pid 303 304 @property 305 def create_time(self): 306 """Timestamp when this tensor was created (long integer).""" 307 return self._create_time 308 309 @property 310 def object_id(self): 311 """Returns the object identifier of this tensor (integer).""" 312 return self._object_id 313 314 @property 315 def num_bytes(self): 316 """Size of this tensor in bytes (long integer).""" 317 return self._num_bytes 318 319 @property 320 def allocator(self): 321 """Name of the allocator used to create this tensor (string).""" 322 return self._allocator 323 324 @property 325 def last_unref(self): 326 """Last unreference timestamp of this tensor (long integer).""" 327 return max(self._unref_times) 328 329 def add_ref(self, timestamp): 330 """Adds a reference to this tensor with the specified timestamp. 331 332 Args: 333 timestamp: Timestamp of object reference as an integer. 334 """ 335 self._ref_times.append(timestamp) 336 337 def add_unref(self, timestamp): 338 """Adds an unref to this tensor with the specified timestamp. 339 340 Args: 341 timestamp: Timestamp of object unreference as an integer. 342 """ 343 self._unref_times.append(timestamp) 344 345 346class Timeline(object): 347 """A class for visualizing execution timelines of TensorFlow steps.""" 348 349 def __init__(self, step_stats, graph=None): 350 """Constructs a new Timeline. 351 352 A 'Timeline' is used for visualizing the execution of a TensorFlow 353 computation. It shows the timings and concurrency of execution at 354 the granularity of TensorFlow Ops. 355 This class is not thread safe. 356 357 Args: 358 step_stats: The 'StepStats' proto recording execution times. 359 graph: (Optional) The 'Graph' that was executed. 360 """ 361 362 self._step_stats = step_stats 363 self._graph = graph 364 self._chrome_trace = _ChromeTraceFormatter() 365 self._next_pid = 0 366 self._device_pids = {} # device name -> pid for compute activity. 367 self._tensor_pids = {} # device name -> pid for tensors. 368 self._tensors = {} # tensor_name -> TensorTracker 369 self._next_flow_id = 0 370 self._flow_starts = {} # tensor_name -> (timestamp, pid, tid) 371 self._alloc_times = {} # tensor_name -> ( time, allocator, size ) 372 self._allocator_maximums = {} # allocator name => maximum bytes long 373 374 def _alloc_pid(self): 375 """Allocate a process Id.""" 376 pid = self._next_pid 377 self._next_pid += 1 378 return pid 379 380 def _alloc_flow_id(self): 381 """Allocate a flow Id.""" 382 flow_id = self._next_flow_id 383 self._next_flow_id += 1 384 return flow_id 385 386 def _parse_op_label(self, label): 387 """Parses the fields in a node timeline label.""" 388 # Expects labels of the form: name = op(arg, arg, ...). 389 match = re.match(r'(.*) = (.*)\((.*)\)', label) 390 if match is None: 391 return 'unknown', 'unknown', [] 392 nn, op, inputs = match.groups() 393 if not inputs: 394 inputs = [] 395 else: 396 inputs = inputs.split(', ') 397 return nn, op, inputs 398 399 def _assign_lanes(self): 400 """Assigns non-overlapping lanes for the activities on each device.""" 401 for device_stats in self._step_stats.dev_stats: 402 # TODO(pbar): Genuine thread IDs in NodeExecStats might be helpful. 403 lanes = [0] 404 for ns in device_stats.node_stats: 405 l = -1 406 for (i, lts) in enumerate(lanes): 407 if ns.all_start_micros > lts: 408 l = i 409 lanes[l] = ns.all_start_micros + ns.all_end_rel_micros 410 break 411 if l < 0: 412 l = len(lanes) 413 lanes.append(ns.all_start_micros + ns.all_end_rel_micros) 414 ns.thread_id = l 415 416 def _emit_op(self, nodestats, pid, is_gputrace): 417 """Generates a Chrome Trace event to show Op execution. 418 419 Args: 420 nodestats: The 'NodeExecStats' proto recording op execution. 421 pid: The pid assigned for the device where this op ran. 422 is_gputrace: If True then this op came from the GPUTracer. 423 """ 424 node_name = nodestats.node_name 425 start = nodestats.all_start_micros 426 duration = nodestats.all_end_rel_micros 427 tid = nodestats.thread_id 428 inputs = [] 429 if is_gputrace: 430 # Node names should always have the form 'name:op'. 431 fields = node_name.split(':') + ['unknown'] 432 node_name, op = fields[:2] 433 elif node_name == 'RecvTensor': 434 # RPC tracing does not use the standard timeline_label format. 435 op = 'RecvTensor' 436 else: 437 _, op, inputs = self._parse_op_label(nodestats.timeline_label) 438 args = {'name': node_name, 'op': op} 439 for i, iname in enumerate(inputs): 440 args['input%d' % i] = iname 441 self._chrome_trace.emit_region(start, duration, pid, tid, 'Op', op, args) 442 443 def _emit_tensor_snapshot(self, tensor, timestamp, pid, tid, value): 444 """Generate Chrome Trace snapshot event for a computed Tensor. 445 446 Args: 447 tensor: A 'TensorTracker' object. 448 timestamp: The timestamp of this snapshot as a long integer. 449 pid: The pid assigned for showing the device where this op ran. 450 tid: The tid of the thread computing the tensor snapshot. 451 value: A JSON-compliant snapshot of the object. 452 """ 453 desc = str(value.tensor_description).replace('"', '') 454 snapshot = {'tensor_description': desc} 455 self._chrome_trace.emit_obj_snapshot('Tensor', tensor.name, timestamp, pid, 456 tid, tensor.object_id, snapshot) 457 458 def _produce_tensor(self, name, timestamp, tensors_pid, allocator, num_bytes): 459 object_id = len(self._tensors) 460 tensor = _TensorTracker(name, object_id, timestamp, tensors_pid, allocator, 461 num_bytes) 462 self._tensors[name] = tensor 463 return tensor 464 465 def _is_gputrace_device(self, device_name): 466 """Returns true if this device is part of the GPUTracer logging.""" 467 return '/stream:' in device_name or '/memcpy' in device_name 468 469 def _allocate_pids(self): 470 """Allocate fake process ids for each device in the StepStats.""" 471 self._allocators_pid = self._alloc_pid() 472 self._chrome_trace.emit_pid('Allocators', self._allocators_pid) 473 474 # Add processes in the Chrome trace to show compute and data activity. 475 for dev_stats in self._step_stats.dev_stats: 476 device_pid = self._alloc_pid() 477 self._device_pids[dev_stats.device] = device_pid 478 tensors_pid = self._alloc_pid() 479 self._tensor_pids[dev_stats.device] = tensors_pid 480 self._chrome_trace.emit_pid(dev_stats.device + ' Compute', device_pid) 481 self._chrome_trace.emit_pid(dev_stats.device + ' Tensors', tensors_pid) 482 483 def _analyze_tensors(self, show_memory): 484 """Analyze tensor references to track dataflow.""" 485 for dev_stats in self._step_stats.dev_stats: 486 device_pid = self._device_pids[dev_stats.device] 487 tensors_pid = self._tensor_pids[dev_stats.device] 488 for node_stats in dev_stats.node_stats: 489 tid = node_stats.thread_id 490 node_name = node_stats.node_name 491 start_time = node_stats.all_start_micros 492 end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros 493 for index, output in enumerate(node_stats.output): 494 if index: 495 output_name = '%s:%d' % (node_name, index) 496 else: 497 output_name = node_name 498 499 allocation = output.tensor_description.allocation_description 500 num_bytes = allocation.requested_bytes 501 allocator_name = allocation.allocator_name 502 tensor = self._produce_tensor(output_name, start_time, tensors_pid, 503 allocator_name, num_bytes) 504 tensor.add_ref(start_time) 505 tensor.add_unref(end_time) 506 self._flow_starts[output_name] = (end_time, device_pid, tid) 507 508 if show_memory: 509 self._chrome_trace.emit_obj_create('Tensor', output_name, 510 start_time, tensors_pid, tid, 511 tensor.object_id) 512 self._emit_tensor_snapshot(tensor, end_time - 1, tensors_pid, tid, 513 output) 514 515 def _show_compute(self, show_dataflow): 516 """Visualize the computation activity.""" 517 for dev_stats in self._step_stats.dev_stats: 518 device_name = dev_stats.device 519 device_pid = self._device_pids[device_name] 520 is_gputrace = self._is_gputrace_device(device_name) 521 522 for node_stats in dev_stats.node_stats: 523 tid = node_stats.thread_id 524 start_time = node_stats.all_start_micros 525 end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros 526 self._emit_op(node_stats, device_pid, is_gputrace) 527 528 if is_gputrace or node_stats.node_name == 'RecvTensor': 529 continue 530 531 _, _, inputs = self._parse_op_label(node_stats.timeline_label) 532 for input_name in inputs: 533 if input_name not in self._tensors: 534 # This can happen when partitioning has inserted a Send/Recv. 535 # We remove the numeric suffix so that the dataflow appears to 536 # come from the original node. Ideally, the StepStats would 537 # contain logging for the Send and Recv nodes. 538 index = input_name.rfind('/_') 539 if index > 0: 540 input_name = input_name[:index] 541 542 if input_name in self._tensors: 543 tensor = self._tensors[input_name] 544 tensor.add_ref(start_time) 545 tensor.add_unref(end_time - 1) 546 547 if show_dataflow: 548 # We use a different flow ID for every graph edge. 549 create_time, create_pid, create_tid = self._flow_starts[ 550 input_name] 551 # Don't add flows when producer and consumer ops are on the same 552 # pid/tid since the horizontal arrows clutter the visualization. 553 if create_pid != device_pid or create_tid != tid: 554 flow_id = self._alloc_flow_id() 555 self._chrome_trace.emit_flow_start(input_name, create_time, 556 create_pid, create_tid, 557 flow_id) 558 self._chrome_trace.emit_flow_end(input_name, start_time, 559 device_pid, tid, flow_id) 560 else: 561 logging.vlog(1, 'Can\'t find tensor %s - removed by CSE?', 562 input_name) 563 564 def _show_memory_counters(self): 565 """Produce a counter series for each memory allocator.""" 566 # Iterate over all tensor trackers to build a list of allocations and 567 # frees for each allocator. Then sort the lists and emit a cumulative 568 # counter series for each allocator. 569 allocations = {} 570 for name in self._tensors: 571 tensor = self._tensors[name] 572 self._chrome_trace.emit_obj_delete('Tensor', name, tensor.last_unref, 573 tensor.pid, 0, tensor.object_id) 574 allocator = tensor.allocator 575 if allocator not in allocations: 576 allocations[allocator] = [] 577 num_bytes = tensor.num_bytes 578 allocations[allocator].append((tensor.create_time, num_bytes, name)) 579 allocations[allocator].append((tensor.last_unref, -num_bytes, name)) 580 581 alloc_maxes = {} 582 583 # Generate a counter series showing total allocations for each allocator. 584 for allocator in allocations: 585 alloc_list = allocations[allocator] 586 alloc_list.sort() 587 total_bytes = 0 588 alloc_tensor_set = set() 589 alloc_maxes[allocator] = AllocationMaximum( 590 timestamp=0, num_bytes=0, tensors=set()) 591 for time, num_bytes, name in sorted( 592 alloc_list, key=lambda allocation: allocation[0]): 593 total_bytes += num_bytes 594 if num_bytes < 0: 595 alloc_tensor_set.discard(name) 596 else: 597 alloc_tensor_set.add(name) 598 599 if total_bytes > alloc_maxes[allocator].num_bytes: 600 alloc_maxes[allocator] = AllocationMaximum( 601 timestamp=time, 602 num_bytes=total_bytes, 603 tensors=copy.deepcopy(alloc_tensor_set)) 604 605 self._chrome_trace.emit_counter('Memory', allocator, 606 self._allocators_pid, time, allocator, 607 total_bytes) 608 self._allocator_maximums = alloc_maxes 609 610 def analyze_step_stats(self, show_dataflow=True, show_memory=True): 611 self._allocate_pids() 612 self._assign_lanes() 613 self._analyze_tensors(show_memory) 614 self._show_compute(show_dataflow) 615 if show_memory: 616 self._show_memory_counters() 617 return StepStatsAnalysis( 618 chrome_trace=self._chrome_trace, 619 allocator_maximums=self._allocator_maximums) 620 621 def generate_chrome_trace_format(self, show_dataflow=True, show_memory=False): 622 """Produces a trace in Chrome Trace Format. 623 624 Args: 625 show_dataflow: (Optional.) If True, add flow events to the trace 626 connecting producers and consumers of tensors. 627 show_memory: (Optional.) If True, add object snapshot events to the trace 628 showing the sizes and lifetimes of tensors. 629 630 Returns: 631 A JSON formatted string in Chrome Trace format. 632 """ 633 step_stats_analysis = self.analyze_step_stats( 634 show_dataflow=show_dataflow, show_memory=show_memory) 635 636 return step_stats_analysis.chrome_trace.format_to_string(pretty=True) 637