1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for multiple virtual GPU support.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import random 22 23import numpy as np 24 25from google.protobuf import text_format 26from tensorflow.core.protobuf import config_pb2 27from tensorflow.python.framework import dtypes 28from tensorflow.python.framework import ops 29from tensorflow.python.framework import test_util 30from tensorflow.python.ops import array_ops 31from tensorflow.python.ops import math_ops 32from tensorflow.python.ops import random_ops 33from tensorflow.python.ops import variables 34from tensorflow.python.platform import test 35from tensorflow.python.platform import tf_logging as logging 36 37 38class VirtualGpuTestUtil(object): 39 40 def __init__(self, 41 dim=1000, 42 num_ops=100, 43 virtual_devices_per_gpu=None, 44 device_probabilities=None): 45 self._dim = dim 46 self._num_ops = num_ops 47 if virtual_devices_per_gpu is None: 48 self._virtual_devices_per_gpu = [3] 49 else: 50 self._virtual_devices_per_gpu = virtual_devices_per_gpu 51 self._visible_device_list = [ 52 i for i in range(len(self._virtual_devices_per_gpu)) 53 ] 54 gpu_devices = [ 55 ('/gpu:' + str(i)) for i in range(sum(self._virtual_devices_per_gpu)) 56 ] 57 self.devices = ['/cpu:0'] + gpu_devices 58 self._num_devices = len(self.devices) 59 # Each virtual device gets 2GB memory. 60 self._mem_limits_mb = [ 61 ([1 << 11] * i) for i in self._virtual_devices_per_gpu 62 ] 63 self.config = self._GetSessionConfig() 64 65 if device_probabilities is not None: 66 self._device_probabilities = list(device_probabilities) # Deep copy 67 for i in range(1, self._num_devices): 68 self._device_probabilities[i] += self._device_probabilities[i - 1] 69 else: 70 # Each device gets same probability to be assigned an operation. 71 step = 1.0 / self._num_devices 72 self._device_probabilities = [ 73 (x + 1) * step for x in range(self._num_devices) 74 ] 75 # To prevent rounding error causing problems. 76 self._device_probabilities[self._num_devices - 1] = 1.1 77 78 logging.info('dim: %d', self._dim) 79 logging.info('num_ops: %d', self._num_ops) 80 logging.info('visible_device_list: %s', str(self._visible_device_list)) 81 logging.info('virtual_devices_per_gpu: %s', 82 str(self._virtual_devices_per_gpu)) 83 logging.info('mem_limits: %s', str(self._mem_limits_mb)) 84 logging.info('devices: %s', str(self.devices)) 85 logging.info('config: %s', text_format.MessageToString(self.config)) 86 logging.info('device_probabilities: %s', str(self._device_probabilities)) 87 88 # Creates virtual GPU devices 89 def _GetSessionConfig(self): 90 virtual_device_gpu_options = config_pb2.GPUOptions( 91 visible_device_list=','.join(str(d) for d in self._visible_device_list), 92 experimental=config_pb2.GPUOptions.Experimental(virtual_devices=[ 93 config_pb2.GPUOptions.Experimental.VirtualDevices( 94 memory_limit_mb=i) for i in self._mem_limits_mb 95 ])) 96 return config_pb2.ConfigProto(gpu_options=virtual_device_gpu_options) 97 98 # Generates a list of 3-tuples, each tuple contains the source and destination 99 # device index for a binary operation like 'add', like: 100 # (src_device_1, src_device_2, dst_device) 101 def _GenerateOperationPlacement(self): 102 result = [] 103 for unused_i in range(self._num_ops): 104 op_device = () 105 for unused_j in range(3): 106 random_num = random.random() 107 for device_index in range(self._num_devices): 108 if self._device_probabilities[device_index] > random_num: 109 op_device += (device_index,) 110 break 111 result.append(op_device) 112 return result 113 114 # Logs part of the matrix for debugging purposes. 115 def _LogMatrix(self, mat, dim): 116 logging.info('---- printing the first 10*10 submatrix ----') 117 for i in range(min(10, dim)): 118 row = '' 119 for j in range(min(10, dim)): 120 row += ' ' + str(mat[i][j]) 121 logging.info(row) 122 123 # Runs a list of 'add' operations where each operation satisfies the device 124 # placement constraints in `op_placement`, and returns the result. 125 def _TestRandomGraphWithDevices(self, 126 sess, 127 seed, 128 op_placement, 129 devices, 130 debug_mode=False): 131 data = [] 132 shape = (self._dim, self._dim) 133 feed_dict = {} 134 # Initialize the matrices 135 for i in range(len(devices)): 136 with ops.device(devices[i]): 137 var = array_ops.placeholder(dtypes.float32, shape=shape) 138 np.random.seed(seed + i) 139 feed_dict[var] = np.random.uniform( 140 low=0, high=0.1, size=shape).astype(np.float32) 141 data.append(var) 142 # Run the 'add' operations on those matrices 143 for op in op_placement: 144 with ops.device(devices[op[2]]): 145 data[op[2]] = math_ops.add(data[op[0]], data[op[1]]) 146 with ops.device('/cpu:0'): 147 s = data[0] 148 for i in range(1, len(data)): 149 s = math_ops.add(s, data[i]) 150 if debug_mode: 151 logging.info(ops.get_default_graph().as_graph_def()) 152 result = sess.run(s, feed_dict=feed_dict) 153 self._LogMatrix(result, self._dim) 154 return result 155 156 # Generates a random graph with `self._num_ops` 'add' operations with each 157 # operation placed on different virtual device, test that the result is 158 # identical to the result obtained by running the same graph on cpu only. 159 def TestRandomGraph(self, sess, op_placement=None, random_seed=None): 160 debug_mode = False 161 if op_placement is None: 162 op_placement = self._GenerateOperationPlacement() 163 else: 164 debug_mode = True 165 if random_seed is None: 166 random_seed = random.randint(0, 1 << 31) 167 else: 168 debug_mode = True 169 logging.info('Virtual gpu functional test for random graph...') 170 logging.info('operation placement: %s', str(op_placement)) 171 logging.info('random seed: %d', random_seed) 172 173 # Run with multiple virtual gpus. 174 result_vgd = self._TestRandomGraphWithDevices( 175 sess, random_seed, op_placement, self.devices, debug_mode=debug_mode) 176 # Run with single cpu. 177 result_cpu = self._TestRandomGraphWithDevices( 178 sess, 179 random_seed, 180 op_placement, ['/cpu:0'] * self._num_devices, 181 debug_mode=debug_mode) 182 # Test the result 183 for i in range(self._dim): 184 for j in range(self._dim): 185 if result_vgd[i][j] != result_cpu[i][j]: 186 logging.error( 187 'Result mismatch at row %d column %d: expected %f, actual %f', i, 188 j, result_cpu[i][j], result_vgd[i][j]) 189 logging.error('Devices: %s', self.devices) 190 logging.error('Memory limits (in MB): %s', self._mem_limits_mb) 191 return False 192 return True 193 194 195class VirtualGpuTest(test_util.TensorFlowTestCase): 196 197 def __init__(self, method_name): 198 super(VirtualGpuTest, self).__init__(method_name) 199 self._util = VirtualGpuTestUtil() 200 201 @test_util.deprecated_graph_mode_only 202 def testStatsContainAllDeviceNames(self): 203 with self.session(config=self._util.config) as sess: 204 # TODO(laigd): b/70811538. The is_gpu_available() call will invoke 205 # DeviceFactory::AddDevices() with a default SessionOption, which prevents 206 # adding virtual devices in the future, thus must be called within a 207 # context of a session within which virtual devices are created. Same in 208 # the following test case. 209 if not test.is_gpu_available(cuda_only=True): 210 self.skipTest('No GPU available') 211 run_options = config_pb2.RunOptions( 212 trace_level=config_pb2.RunOptions.FULL_TRACE) 213 run_metadata = config_pb2.RunMetadata() 214 215 mat_shape = [10, 10] 216 data = [] 217 for d in self._util.devices: 218 with ops.device(d): 219 var = variables.Variable(random_ops.random_uniform(mat_shape)) 220 self.evaluate(var.initializer) 221 data.append(var) 222 s = data[0] 223 for i in range(1, len(data)): 224 s = math_ops.add(s, data[i]) 225 sess.run(s, options=run_options, run_metadata=run_metadata) 226 227 self.assertTrue(run_metadata.HasField('step_stats')) 228 step_stats = run_metadata.step_stats 229 devices = [d.device for d in step_stats.dev_stats] 230 self.assertTrue('/job:localhost/replica:0/task:0/device:CPU:0' in devices) 231 self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:0' in devices) 232 self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:1' in devices) 233 self.assertTrue('/job:localhost/replica:0/task:0/device:GPU:2' in devices) 234 235 @test_util.deprecated_graph_mode_only 236 def testLargeRandomGraph(self): 237 with self.session(config=self._util.config) as sess: 238 if not test.is_gpu_available(cuda_only=True): 239 self.skipTest('No GPU available') 240 for _ in range(5): 241 if not self._util.TestRandomGraph(sess): 242 return 243 244 245if __name__ == '__main__': 246 test.main() 247