1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Local CPU benchmarks for collective ops.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import time 22 23import numpy as np 24 25from tensorflow.core.protobuf import config_pb2 26from tensorflow.python.client import session 27from tensorflow.python.framework import constant_op 28from tensorflow.python.framework import ops 29from tensorflow.python.ops import collective_ops 30from tensorflow.python.platform import test 31 32 33class CollectiveOpBenchmark(test.Benchmark): 34 """Benchmarks for local CPU collective op execution.""" 35 36 def benchmark_collective(self): 37 """Measures the performance of local CPU collective execution.""" 38 shapes = [(10,), (1000,), (1000000,)] 39 devices = [2, 4, 8] 40 collective_key_counter = 0 41 42 for group_size in devices: 43 group_key = collective_key_counter 44 instance_key = collective_key_counter 45 collective_key_counter += 1 46 47 for shape in shapes: 48 config = config_pb2.ConfigProto(device_count={"CPU": group_size}) 49 with session.Session(config=config) as sess: 50 # Use a C++ callable to minimize the Python overhead in the benchmark. 51 callable_opts = config_pb2.CallableOptions() 52 reduce_ops = [] 53 for device in range(group_size): 54 with ops.device("CPU:{}".format(device)): 55 t = constant_op.constant(np.multiply(range(shape[0]), 1.0)) 56 r = collective_ops.all_reduce(t, group_size, group_key, 57 instance_key, "Add", "Div") 58 reduce_ops.append(r) 59 callable_opts.target.append(r.name) 60 op_callable = sess._make_callable_from_options(callable_opts) # pylint: disable=protected-access 61 62 # Run five steps to warm up the session caches and do collective param 63 # resolution before taking the first measurement. 64 for _ in range(5): 65 op_callable() 66 deltas = [] 67 overall_start = time.time() 68 # Run at least five repetitions and for at least five seconds. 69 while len(deltas) < 5 or time.time() - overall_start < 5.0: 70 start = time.time() 71 for _ in range(100): 72 op_callable() 73 end = time.time() 74 deltas.append(end - start) 75 del op_callable 76 77 median_wall_time = np.median(deltas) / 100.0 78 iters = len(deltas) * 100 79 80 self.report_benchmark( 81 iters=iters, wall_time=median_wall_time, 82 name="num_elements_{}_num_devices_{}".format(np.prod(shape), 83 group_size)) 84 85 86if __name__ == "__main__": 87 test.main() 88