• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Local CPU benchmarks for collective ops."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import time
22
23import numpy as np
24
25from tensorflow.core.protobuf import config_pb2
26from tensorflow.python.client import session
27from tensorflow.python.framework import constant_op
28from tensorflow.python.framework import ops
29from tensorflow.python.ops import collective_ops
30from tensorflow.python.platform import test
31
32
33class CollectiveOpBenchmark(test.Benchmark):
34  """Benchmarks for local CPU collective op execution."""
35
36  def benchmark_collective(self):
37    """Measures the performance of local CPU collective execution."""
38    shapes = [(10,), (1000,), (1000000,)]
39    devices = [2, 4, 8]
40    collective_key_counter = 0
41
42    for group_size in devices:
43      group_key = collective_key_counter
44      instance_key = collective_key_counter
45      collective_key_counter += 1
46
47      for shape in shapes:
48        config = config_pb2.ConfigProto(device_count={"CPU": group_size})
49        with session.Session(config=config) as sess:
50          # Use a C++ callable to minimize the Python overhead in the benchmark.
51          callable_opts = config_pb2.CallableOptions()
52          reduce_ops = []
53          for device in range(group_size):
54            with ops.device("CPU:{}".format(device)):
55              t = constant_op.constant(np.multiply(range(shape[0]), 1.0))
56              r = collective_ops.all_reduce(t, group_size, group_key,
57                                            instance_key, "Add", "Div")
58              reduce_ops.append(r)
59              callable_opts.target.append(r.name)
60          op_callable = sess._make_callable_from_options(callable_opts)  # pylint: disable=protected-access
61
62          # Run five steps to warm up the session caches and do collective param
63          # resolution before taking the first measurement.
64          for _ in range(5):
65            op_callable()
66          deltas = []
67          overall_start = time.time()
68          # Run at least five repetitions and for at least five seconds.
69          while len(deltas) < 5 or time.time() - overall_start < 5.0:
70            start = time.time()
71            for _ in range(100):
72              op_callable()
73            end = time.time()
74            deltas.append(end - start)
75          del op_callable
76
77        median_wall_time = np.median(deltas) / 100.0
78        iters = len(deltas) * 100
79
80        self.report_benchmark(
81            iters=iters, wall_time=median_wall_time,
82            name="num_elements_{}_num_devices_{}".format(np.prod(shape),
83                                                         group_size))
84
85
86if __name__ == "__main__":
87  test.main()
88