• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the 'License');
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an 'AS IS' BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for nccl ops. See also the cc test for nccl_communicator."""
16
17from functools import partial
18
19import numpy as np
20
21from tensorflow.python.framework import errors
22from tensorflow.python.framework import ops
23from tensorflow.python.ops import array_ops
24from tensorflow.python.ops import gradients
25from tensorflow.python.ops import nccl_ops
26from tensorflow.python.platform import test
27
28
29def _DeviceTensors(tensors, devices):
30  res = []
31  for t, d in zip(tensors, devices):
32    with ops.device(d):
33      res.append(array_ops.identity(t))
34  return res
35
36
37def _NcclAllReduce(nccl_fun, tensors, devices):
38  return nccl_fun(_DeviceTensors(tensors, devices))
39
40
41def _NcclReduce(nccl_fun, tensors, devices):
42  receiver = np.random.randint(0, len(devices))
43  with ops.device(devices[receiver]):
44    return [nccl_fun(_DeviceTensors(tensors, devices))]
45
46
47def _NcclBroadcast(tensors, devices):
48  sender = np.random.randint(0, len(devices))
49  with ops.device(devices[sender]):
50    tensor = array_ops.identity(tensors[0])
51    broadcast = nccl_ops.broadcast(tensor)
52  return _DeviceTensors([broadcast] * len(devices), devices)
53
54
55class NcclTestCase(test.TestCase):
56
57  def _Test(self,
58            nccl_reduce,
59            numpy_fn,
60            device_sets=(['/device:GPU:1', '/device:GPU:2', '/device:GPU:0'],
61                         ['/device:GPU:1', '/device:GPU:0'])):
62    """Tests that nccl_reduce does the same as reduction with numpy_fn.
63
64    Args:
65      nccl_reduce: A function taking a list of tensors and a list of devices,
66          and returns a list of reduced tensors and a list of ops to perform the
67          reduction.
68      numpy_fn: A function taking two tensors and returning the reduction of the
69          two.
70      device_sets: Tuple of virtual devices to run test on.
71    """
72    for dtype in [np.float16, np.float32, np.int32, np.int64, np.float64]:
73      # Create session inside outer loop to test use of
74      # same communicator across multiple sessions.
75      with self.test_session():
76
77        for devices in device_sets:
78          shape = (3, 4)
79          random = (np.random.random_sample(shape) - .5) * 1024
80          tensors = []
81          for _ in devices:
82            tensors.append(random.astype(dtype))
83          np_ans = tensors[0]
84          for t in tensors[1:]:
85            np_ans = numpy_fn(np_ans, t)
86
87          reduce_tensors = nccl_reduce(tensors, devices)
88          self.assertNotEmpty(reduce_tensors)
89
90          # Test shape inference.
91          for r in reduce_tensors:
92            self.assertEqual(shape, r.get_shape())
93
94          result_tensors = [array_ops.identity(t) for t in reduce_tensors]
95
96          # Check GPU availability *after* creating session, see b/68975239.
97          if not test.is_gpu_available():
98            # If no GPU is available, only test graph construction.
99            continue
100
101          # Test execution and results.
102          for t in self.evaluate(result_tensors):
103            self.assertAllClose(t, np_ans)
104
105  def _TestGradient(self, nccl_reduce, numpy_fn):
106    """Tests the gradient of nccl_reduce.
107
108    Args:
109      nccl_reduce: A function taking a list of tensors and a list of devices,
110          and returns a list of reduced tensors and a list of ops to perform the
111          reduction.
112      numpy_fn: A function taking two tensors and returning the gradient of the
113          reduction of the two.
114    """
115
116    def _Gradient(tensors, devices):
117      inputs = [array_ops.placeholder(t.dtype, t.shape) for t in tensors]
118      reduce_tensors = nccl_reduce(inputs, devices)
119      losses = _DeviceTensors(tensors, [t.device for t in reduce_tensors])
120      grads = gradients.gradients(
121          reduce_tensors, inputs, losses, colocate_gradients_with_ops=True)
122      return [g for g in grads if g is not None]
123
124    self._Test(_Gradient, numpy_fn)
125
126
127class AllReduceTest(NcclTestCase):
128
129  def testAllReduce(self):
130    self._Test(partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y)
131    self._Test(partial(_NcclAllReduce, nccl_ops.all_prod), lambda x, y: x * y)
132    self._Test(partial(_NcclAllReduce, nccl_ops.all_min), np.minimum)
133    self._Test(partial(_NcclAllReduce, nccl_ops.all_max), np.maximum)
134
135  def testAllSumGrad(self):
136    self._TestGradient(
137        partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y)
138
139  def testErrors(self):
140    with self.assertRaisesRegex(ValueError, 'Device assignment .* required'):
141      nccl_ops.all_sum([array_ops.identity(np.random.random_sample((3, 4)))])
142    with self.assertRaisesRegex(ValueError, 'Must pass >0 tensors'):
143      nccl_ops.all_sum([])
144
145
146class SingleReduceTest(NcclTestCase):
147
148  def testSum(self):
149    self._Test(partial(_NcclReduce, nccl_ops.reduce_sum), lambda x, y: x + y)
150
151  def testSumGrad(self):
152    self._TestGradient(partial(_NcclReduce, nccl_ops.reduce_sum),
153                       lambda x, y: x)
154
155
156class BroadcastTest(NcclTestCase):
157
158  def testBroadcast(self):
159    self._Test(_NcclBroadcast, lambda x, y: x)
160
161  def testBroadcastSingleDevice(self):
162    # Broadcasts on a single device are removed completely during rewrite.
163    self._Test(_NcclBroadcast, lambda x, y: x,
164               (['/device:GPU:0', '/device:GPU:0'],))
165
166  def testBroadcastToCpuError(self):
167    try:
168      # Broadcasts to CPU is not supported.
169      self._Test(_NcclBroadcast, lambda x, y: x,
170                 (['/device:GPU:0', '/device:CPU:0'],))
171    except errors.NotFoundError as e:
172      self.assertRegex(
173          str(e), "No registered '_NcclBroadcastRecv' OpKernel for CPU devices")
174    else:
175      # Session isn't executed when no GPU is available.
176      if test.is_gpu_available():
177        self.fail("Didn't raise NotFoundError trying to broadcast to CPU")
178
179
180class CombinedTest(NcclTestCase):
181  """Test all-reduce vs. single-reduce plus broadcast in one session.run."""
182
183  def _Combined(self, tensors, devices):
184    all_reduce_tensors = _NcclAllReduce(nccl_ops.all_sum, tensors, devices)
185    single_reduce_tensors = _NcclReduce(nccl_ops.reduce_sum, tensors, devices)
186    broadcast_tensors = _NcclBroadcast(single_reduce_tensors, devices)
187    return all_reduce_tensors + broadcast_tensors
188
189  def testCombined(self):
190    self._Test(self._Combined, lambda x, y: x + y)
191
192
193if __name__ == '__main__':
194  test.main()
195