1# Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for nccl ops. See also the cc test for nccl_communicator.""" 16 17from functools import partial 18 19import numpy as np 20 21from tensorflow.python.framework import errors 22from tensorflow.python.framework import ops 23from tensorflow.python.ops import array_ops 24from tensorflow.python.ops import gradients 25from tensorflow.python.ops import nccl_ops 26from tensorflow.python.platform import test 27 28 29def _DeviceTensors(tensors, devices): 30 res = [] 31 for t, d in zip(tensors, devices): 32 with ops.device(d): 33 res.append(array_ops.identity(t)) 34 return res 35 36 37def _NcclAllReduce(nccl_fun, tensors, devices): 38 return nccl_fun(_DeviceTensors(tensors, devices)) 39 40 41def _NcclReduce(nccl_fun, tensors, devices): 42 receiver = np.random.randint(0, len(devices)) 43 with ops.device(devices[receiver]): 44 return [nccl_fun(_DeviceTensors(tensors, devices))] 45 46 47def _NcclBroadcast(tensors, devices): 48 sender = np.random.randint(0, len(devices)) 49 with ops.device(devices[sender]): 50 tensor = array_ops.identity(tensors[0]) 51 broadcast = nccl_ops.broadcast(tensor) 52 return _DeviceTensors([broadcast] * len(devices), devices) 53 54 55class NcclTestCase(test.TestCase): 56 57 def _Test(self, 58 nccl_reduce, 59 numpy_fn, 60 device_sets=(['/device:GPU:1', '/device:GPU:2', '/device:GPU:0'], 61 ['/device:GPU:1', '/device:GPU:0'])): 62 """Tests that nccl_reduce does the same as reduction with numpy_fn. 63 64 Args: 65 nccl_reduce: A function taking a list of tensors and a list of devices, 66 and returns a list of reduced tensors and a list of ops to perform the 67 reduction. 68 numpy_fn: A function taking two tensors and returning the reduction of the 69 two. 70 device_sets: Tuple of virtual devices to run test on. 71 """ 72 for dtype in [np.float16, np.float32, np.int32, np.int64, np.float64]: 73 # Create session inside outer loop to test use of 74 # same communicator across multiple sessions. 75 with self.test_session(): 76 77 for devices in device_sets: 78 shape = (3, 4) 79 random = (np.random.random_sample(shape) - .5) * 1024 80 tensors = [] 81 for _ in devices: 82 tensors.append(random.astype(dtype)) 83 np_ans = tensors[0] 84 for t in tensors[1:]: 85 np_ans = numpy_fn(np_ans, t) 86 87 reduce_tensors = nccl_reduce(tensors, devices) 88 self.assertNotEmpty(reduce_tensors) 89 90 # Test shape inference. 91 for r in reduce_tensors: 92 self.assertEqual(shape, r.get_shape()) 93 94 result_tensors = [array_ops.identity(t) for t in reduce_tensors] 95 96 # Check GPU availability *after* creating session, see b/68975239. 97 if not test.is_gpu_available(): 98 # If no GPU is available, only test graph construction. 99 continue 100 101 # Test execution and results. 102 for t in self.evaluate(result_tensors): 103 self.assertAllClose(t, np_ans) 104 105 def _TestGradient(self, nccl_reduce, numpy_fn): 106 """Tests the gradient of nccl_reduce. 107 108 Args: 109 nccl_reduce: A function taking a list of tensors and a list of devices, 110 and returns a list of reduced tensors and a list of ops to perform the 111 reduction. 112 numpy_fn: A function taking two tensors and returning the gradient of the 113 reduction of the two. 114 """ 115 116 def _Gradient(tensors, devices): 117 inputs = [array_ops.placeholder(t.dtype, t.shape) for t in tensors] 118 reduce_tensors = nccl_reduce(inputs, devices) 119 losses = _DeviceTensors(tensors, [t.device for t in reduce_tensors]) 120 grads = gradients.gradients( 121 reduce_tensors, inputs, losses, colocate_gradients_with_ops=True) 122 return [g for g in grads if g is not None] 123 124 self._Test(_Gradient, numpy_fn) 125 126 127class AllReduceTest(NcclTestCase): 128 129 def testAllReduce(self): 130 self._Test(partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y) 131 self._Test(partial(_NcclAllReduce, nccl_ops.all_prod), lambda x, y: x * y) 132 self._Test(partial(_NcclAllReduce, nccl_ops.all_min), np.minimum) 133 self._Test(partial(_NcclAllReduce, nccl_ops.all_max), np.maximum) 134 135 def testAllSumGrad(self): 136 self._TestGradient( 137 partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y) 138 139 def testErrors(self): 140 with self.assertRaisesRegex(ValueError, 'Device assignment .* required'): 141 nccl_ops.all_sum([array_ops.identity(np.random.random_sample((3, 4)))]) 142 with self.assertRaisesRegex(ValueError, 'Must pass >0 tensors'): 143 nccl_ops.all_sum([]) 144 145 146class SingleReduceTest(NcclTestCase): 147 148 def testSum(self): 149 self._Test(partial(_NcclReduce, nccl_ops.reduce_sum), lambda x, y: x + y) 150 151 def testSumGrad(self): 152 self._TestGradient(partial(_NcclReduce, nccl_ops.reduce_sum), 153 lambda x, y: x) 154 155 156class BroadcastTest(NcclTestCase): 157 158 def testBroadcast(self): 159 self._Test(_NcclBroadcast, lambda x, y: x) 160 161 def testBroadcastSingleDevice(self): 162 # Broadcasts on a single device are removed completely during rewrite. 163 self._Test(_NcclBroadcast, lambda x, y: x, 164 (['/device:GPU:0', '/device:GPU:0'],)) 165 166 def testBroadcastToCpuError(self): 167 try: 168 # Broadcasts to CPU is not supported. 169 self._Test(_NcclBroadcast, lambda x, y: x, 170 (['/device:GPU:0', '/device:CPU:0'],)) 171 except errors.NotFoundError as e: 172 self.assertRegex( 173 str(e), "No registered '_NcclBroadcastRecv' OpKernel for CPU devices") 174 else: 175 # Session isn't executed when no GPU is available. 176 if test.is_gpu_available(): 177 self.fail("Didn't raise NotFoundError trying to broadcast to CPU") 178 179 180class CombinedTest(NcclTestCase): 181 """Test all-reduce vs. single-reduce plus broadcast in one session.run.""" 182 183 def _Combined(self, tensors, devices): 184 all_reduce_tensors = _NcclAllReduce(nccl_ops.all_sum, tensors, devices) 185 single_reduce_tensors = _NcclReduce(nccl_ops.reduce_sum, tensors, devices) 186 broadcast_tensors = _NcclBroadcast(single_reduce_tensors, devices) 187 return all_reduce_tensors + broadcast_tensors 188 189 def testCombined(self): 190 self._Test(self._Combined, lambda x, y: x + y) 191 192 193if __name__ == '__main__': 194 test.main() 195