• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for multiple virtual GPU support."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import random
22
23import numpy as np
24
25from google.protobuf import text_format
26from tensorflow.core.protobuf import config_pb2
27from tensorflow.python.framework import dtypes
28from tensorflow.python.framework import ops
29from tensorflow.python.framework import test_util
30from tensorflow.python.ops import array_ops
31from tensorflow.python.ops import math_ops
32from tensorflow.python.ops import random_ops
33from tensorflow.python.ops import variables
34from tensorflow.python.platform import test
35from tensorflow.python.platform import tf_logging as logging
36
37
38class VirtualGpuTestUtil(object):
39
40  def __init__(self,
41               dim=1000,
42               num_ops=100,
43               virtual_devices_per_gpu=None,
44               device_probabilities=None):
45    self._dim = dim
46    self._num_ops = num_ops
47    if virtual_devices_per_gpu is None:
48      self._virtual_devices_per_gpu = [3]
49    else:
50      self._virtual_devices_per_gpu = virtual_devices_per_gpu
51    self._visible_device_list = [
52        i for i in range(len(self._virtual_devices_per_gpu))
53    ]
54    gpu_devices = [
55        ('/gpu:' + str(i)) for i in range(sum(self._virtual_devices_per_gpu))
56    ]
57    self.devices = ['/cpu:0'] + gpu_devices
58    self._num_devices = len(self.devices)
59    # Each virtual device gets 2GB memory.
60    self._mem_limits_mb = [
61        ([1 << 11] * i) for i in self._virtual_devices_per_gpu
62    ]
63    self.config = self._GetSessionConfig()
64
65    if device_probabilities is not None:
66      self._device_probabilities = list(device_probabilities)  # Deep copy
67      for i in range(1, self._num_devices):
68        self._device_probabilities[i] += self._device_probabilities[i - 1]
69    else:
70      # Each device gets same probability to be assigned an operation.
71      step = 1.0 / self._num_devices
72      self._device_probabilities = [
73          (x + 1) * step for x in range(self._num_devices)
74      ]
75    # To prevent rounding error causing problems.
76    self._device_probabilities[self._num_devices - 1] = 1.1
77
78    logging.info('dim: %d', self._dim)
79    logging.info('num_ops: %d', self._num_ops)
80    logging.info('visible_device_list: %s', str(self._visible_device_list))
81    logging.info('virtual_devices_per_gpu: %s',
82                 str(self._virtual_devices_per_gpu))
83    logging.info('mem_limits: %s', str(self._mem_limits_mb))
84    logging.info('devices: %s', str(self.devices))
85    logging.info('config: %s', text_format.MessageToString(self.config))
86    logging.info('device_probabilities: %s', str(self._device_probabilities))
87
88  # Creates virtual GPU devices
89  def _GetSessionConfig(self):
90    virtual_device_gpu_options = config_pb2.GPUOptions(
91        visible_device_list=','.join(str(d) for d in self._visible_device_list),
92        experimental=config_pb2.GPUOptions.Experimental(virtual_devices=[
93            config_pb2.GPUOptions.Experimental.VirtualDevices(
94                memory_limit_mb=i) for i in self._mem_limits_mb
95        ]))
96    return config_pb2.ConfigProto(gpu_options=virtual_device_gpu_options)
97
98  # Generates a list of 3-tuples, each tuple contains the source and destination
99  # device index for a binary operation like 'add', like:
100  # (src_device_1, src_device_2, dst_device)
101  def _GenerateOperationPlacement(self):
102    result = []
103    for unused_i in range(self._num_ops):
104      op_device = ()
105      for unused_j in range(3):
106        random_num = random.random()
107        for device_index in range(self._num_devices):
108          if self._device_probabilities[device_index] > random_num:
109            op_device += (device_index,)
110            break
111      result.append(op_device)
112    return result
113
114  # Logs part of the matrix for debugging purposes.
115  def _LogMatrix(self, mat, dim):
116    logging.info('---- printing the first 10*10 submatrix ----')
117    for i in range(min(10, dim)):
118      row = ''
119      for j in range(min(10, dim)):
120        row += ' ' + str(mat[i][j])
121      logging.info(row)
122
123  # Runs a list of 'add' operations where each operation satisfies the device
124  # placement constraints in `op_placement`, and returns the result.
125  def _TestRandomGraphWithDevices(self,
126                                  sess,
127                                  seed,
128                                  op_placement,
129                                  devices,
130                                  debug_mode=False):
131    data = []
132    shape = (self._dim, self._dim)
133    feed_dict = {}
134    # Initialize the matrices
135    for i in range(len(devices)):
136      with ops.device(devices[i]):
137        var = array_ops.placeholder(dtypes.float32, shape=shape)
138        np.random.seed(seed + i)
139        feed_dict[var] = np.random.uniform(
140            low=0, high=0.1, size=shape).astype(np.float32)
141        data.append(var)
142    # Run the 'add' operations on those matrices
143    for op in op_placement:
144      with ops.device(devices[op[2]]):
145        data[op[2]] = math_ops.add(data[op[0]], data[op[1]])
146    with ops.device('/cpu:0'):
147      s = data[0]
148      for i in range(1, len(data)):
149        s = math_ops.add(s, data[i])
150    if debug_mode:
151      logging.info(ops.get_default_graph().as_graph_def())
152    result = sess.run(s, feed_dict=feed_dict)
153    self._LogMatrix(result, self._dim)
154    return result
155
156  # Generates a random graph with `self._num_ops` 'add' operations with each
157  # operation placed on different virtual device, test that the result is
158  # identical to the result obtained by running the same graph on cpu only.
159  def TestRandomGraph(self, sess, op_placement=None, random_seed=None):
160    debug_mode = False
161    if op_placement is None:
162      op_placement = self._GenerateOperationPlacement()
163    else:
164      debug_mode = True
165    if random_seed is None:
166      random_seed = random.randint(0, 1 << 31)
167    else:
168      debug_mode = True
169    logging.info('Virtual gpu functional test for random graph...')
170    logging.info('operation placement: %s', str(op_placement))
171    logging.info('random seed: %d', random_seed)
172
173    # Run with multiple virtual gpus.
174    result_vgd = self._TestRandomGraphWithDevices(
175        sess, random_seed, op_placement, self.devices, debug_mode=debug_mode)
176    # Run with single cpu.
177    result_cpu = self._TestRandomGraphWithDevices(
178        sess,
179        random_seed,
180        op_placement, ['/cpu:0'] * self._num_devices,
181        debug_mode=debug_mode)
182    # Test the result
183    for i in range(self._dim):
184      for j in range(self._dim):
185        if result_vgd[i][j] != result_cpu[i][j]:
186          logging.error(
187              'Result mismatch at row %d column %d: expected %f, actual %f', i,
188              j, result_cpu[i][j], result_vgd[i][j])
189          logging.error('Devices: %s', self.devices)
190          logging.error('Memory limits (in MB): %s', self._mem_limits_mb)
191          return False
192    return True
193
194
195class VirtualGpuTest(test_util.TensorFlowTestCase):
196
197  def __init__(self, method_name):
198    super(VirtualGpuTest, self).__init__(method_name)
199    self._util = VirtualGpuTestUtil()
200
201  @test_util.deprecated_graph_mode_only
202  def testStatsContainAllDeviceNames(self):
203    with self.session(config=self._util.config) as sess:
204      # TODO(laigd): b/70811538. The is_gpu_available() call will invoke
205      # DeviceFactory::AddDevices() with a default SessionOption, which prevents
206      # adding virtual devices in the future, thus must be called within a
207      # context of a session within which virtual devices are created. Same in
208      # the following test case.
209      if not test.is_gpu_available(cuda_only=True):
210        self.skipTest('No GPU available')
211      run_options = config_pb2.RunOptions(
212          trace_level=config_pb2.RunOptions.FULL_TRACE)
213      run_metadata = config_pb2.RunMetadata()
214
215      mat_shape = [10, 10]
216      data = []
217      for d in self._util.devices:
218        with ops.device(d):
219          var = variables.Variable(random_ops.random_uniform(mat_shape))
220          self.evaluate(var.initializer)
221          data.append(var)
222      s = data[0]
223      for i in range(1, len(data)):
224        s = math_ops.add(s, data[i])
225      sess.run(s, options=run_options, run_metadata=run_metadata)
226
227    self.assertTrue(run_metadata.HasField('step_stats'))
228    step_stats = run_metadata.step_stats
229    devices = [d.device for d in step_stats.dev_stats]
230    self.assertTrue('/job:localhost/replica:0/task:0/device:CPU:0' in devices)
231    self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:0' in devices)
232    self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:1' in devices)
233    self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:2' in devices)
234
235  @test_util.deprecated_graph_mode_only
236  def testLargeRandomGraph(self):
237    with self.session(config=self._util.config) as sess:
238      if not test.is_gpu_available(cuda_only=True):
239        self.skipTest('No GPU available')
240      for _ in range(5):
241        if not self._util.TestRandomGraph(sess):
242          return
243
244
245if __name__ == '__main__':
246  test.main()
247