1 #include <gtest/gtest.h>
2
3 #include <ATen/cuda/CUDAContext.h>
4 #include <ATen/cuda/CUDAEvent.h>
5 #include <c10/core/Event.h>
6 #include <c10/core/impl/InlineEvent.h>
7 #include <c10/cuda/CUDAGuard.h>
8 #include <c10/cuda/impl/CUDAGuardImpl.h>
9 #include <c10/util/irange.h>
10
11 #include <cuda_runtime.h>
12
13 #include <functional>
14 #include <future>
15 #include <thread>
16 #include <unordered_set>
17
18 #define ASSERT_EQ_CUDA(X, Y) \
19 { \
20 bool isTRUE = X == Y; \
21 ASSERT_TRUE(isTRUE); \
22 }
23
24 #define ASSERT_NE_CUDA(X, Y) \
25 { \
26 bool isFALSE = X == Y; \
27 ASSERT_FALSE(isFALSE); \
28 }
29
30 /*
31 Tests related to ATen streams.
32 */
33 // Verifies streams are live through copying and moving
TEST(TestStream,CopyAndMoveTest)34 TEST(TestStream, CopyAndMoveTest) {
35 if (!at::cuda::is_available()) return;
36 int32_t device = -1;
37 cudaStream_t cuda_stream;
38
39 // Tests that copying works as expected and preserves the stream
40 at::cuda::CUDAStream copyStream = at::cuda::getStreamFromPool();
41 {
42 auto s = at::cuda::getStreamFromPool();
43 device = s.device_index();
44 cuda_stream = s.stream();
45
46 copyStream = s;
47
48 ASSERT_EQ_CUDA(copyStream.device_index(), device);
49 ASSERT_EQ_CUDA(copyStream.stream(), cuda_stream);
50 }
51
52 ASSERT_EQ_CUDA(copyStream.device_index(), device);
53 ASSERT_EQ_CUDA(copyStream.stream(), cuda_stream);
54
55 // Tests that moving works as expected and preserves the stream
56 at::cuda::CUDAStream moveStream = at::cuda::getStreamFromPool();
57 {
58 auto s = at::cuda::getStreamFromPool();
59 device = s.device_index();
60 cuda_stream = s.stream();
61
62 moveStream = std::move(s);
63
64 ASSERT_EQ_CUDA(moveStream.device_index(), device);
65 ASSERT_EQ_CUDA(moveStream.stream(), cuda_stream);
66 }
67
68 ASSERT_EQ_CUDA(moveStream.device_index(), device);
69 ASSERT_EQ_CUDA(moveStream.stream(), cuda_stream);
70 }
71
72 // Verifies streams are set properly
TEST(TestStream,GetAndSetTest)73 TEST(TestStream, GetAndSetTest) {
74 if (!at::cuda::is_available()) return;
75 at::cuda::CUDAStream myStream = at::cuda::getStreamFromPool();
76
77 // Sets and gets
78 at::cuda::setCurrentCUDAStream(myStream);
79 at::cuda::CUDAStream curStream = at::cuda::getCurrentCUDAStream();
80
81 ASSERT_EQ_CUDA(myStream, curStream);
82
83 // Gets, sets, and gets default stream
84 at::cuda::CUDAStream defaultStream = at::cuda::getDefaultCUDAStream();
85 at::cuda::setCurrentCUDAStream(defaultStream);
86 curStream = at::cuda::getCurrentCUDAStream();
87
88 ASSERT_NE_CUDA(defaultStream, myStream);
89 ASSERT_EQ_CUDA(curStream, defaultStream);
90 }
91
thread_fun(std::optional<at::cuda::CUDAStream> & cur_thread_stream)92 void thread_fun(std::optional<at::cuda::CUDAStream>& cur_thread_stream) {
93 auto new_stream = at::cuda::getStreamFromPool();
94 at::cuda::setCurrentCUDAStream(new_stream);
95 cur_thread_stream = {at::cuda::getCurrentCUDAStream()};
96 ASSERT_EQ_CUDA(*cur_thread_stream, new_stream);
97 }
98
99 // Ensures streams are thread local
TEST(TestStream,MultithreadGetAndSetTest)100 TEST(TestStream, MultithreadGetAndSetTest) {
101 if (!at::cuda::is_available()) return;
102 std::optional<at::cuda::CUDAStream> s0, s1;
103
104 std::thread t0{thread_fun, std::ref(s0)};
105 std::thread t1{thread_fun, std::ref(s1)};
106 t0.join();
107 t1.join();
108
109 at::cuda::CUDAStream cur_stream = at::cuda::getCurrentCUDAStream();
110 at::cuda::CUDAStream default_stream = at::cuda::getDefaultCUDAStream();
111
112 ASSERT_EQ_CUDA(cur_stream, default_stream);
113 ASSERT_NE_CUDA(cur_stream, *s0);
114 ASSERT_NE_CUDA(cur_stream, *s1);
115 ASSERT_NE_CUDA(s0, s1);
116 }
117
118 // CUDA Guard
TEST(TestStream,CUDAGuardTest)119 TEST(TestStream, CUDAGuardTest) {
120 if (!at::cuda::is_available()) return;
121 if (at::cuda::getNumGPUs() < 2) {
122 return;
123 }
124
125 // -- begin setup
126
127 ASSERT_EQ_CUDA(at::cuda::current_device(), 0);
128 std::vector<at::cuda::CUDAStream> streams0 = {
129 at::cuda::getDefaultCUDAStream(), at::cuda::getStreamFromPool()};
130 ASSERT_EQ_CUDA(streams0[0].device_index(), 0);
131 ASSERT_EQ_CUDA(streams0[1].device_index(), 0);
132 at::cuda::setCurrentCUDAStream(streams0[0]);
133
134 std::vector<at::cuda::CUDAStream> streams1;
135 {
136 at::cuda::CUDAGuard device_guard(1);
137 streams1.push_back(at::cuda::getDefaultCUDAStream());
138 streams1.push_back(at::cuda::getStreamFromPool());
139 }
140 ASSERT_EQ_CUDA(streams1[0].device_index(), 1);
141 ASSERT_EQ_CUDA(streams1[1].device_index(), 1);
142 at::cuda::setCurrentCUDAStream(streams1[0]);
143
144 ASSERT_EQ_CUDA(at::cuda::current_device(), 0);
145
146 // -- end setup
147
148 // Setting a stream changes the current device and the stream on that device
149 {
150 at::cuda::CUDAStreamGuard guard(streams1[1]);
151 ASSERT_EQ_CUDA(guard.current_device(), at::Device(at::kCUDA, 1));
152 ASSERT_EQ_CUDA(at::cuda::current_device(), 1);
153 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(1), streams1[1]);
154 }
155
156 // Device and stream are now reset
157 ASSERT_EQ_CUDA(at::cuda::current_device(), 0);
158 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(1), streams1[0]);
159
160 // Setting only the device changes only the current device and not the stream
161 {
162 at::cuda::CUDAGuard guard(/*device=*/1);
163 ASSERT_EQ_CUDA(guard.current_device(), at::Device(at::kCUDA, 1));
164 ASSERT_EQ_CUDA(at::cuda::current_device(), 1);
165 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(1), streams1[0]);
166 }
167
168 ASSERT_EQ_CUDA(at::cuda::current_device(), 0);
169 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(0), streams0[0]);
170 }
171
172 // Streampool Round Robin
TEST(TestStream,StreamPoolTest)173 TEST(TestStream, StreamPoolTest) {
174 if (!at::cuda::is_available()) return;
175 std::vector<at::cuda::CUDAStream> streams{};
176 for (const auto i : c10::irange(200)) {
177 (void)i;
178 streams.emplace_back(at::cuda::getStreamFromPool());
179 }
180
181 std::unordered_set<cudaStream_t> stream_set{};
182 bool hasDuplicates = false;
183 for (const auto i: c10::irange(streams.size())) {
184 cudaStream_t cuda_stream = streams[i];
185 auto result_pair = stream_set.insert(cuda_stream);
186 if (!result_pair.second)
187 hasDuplicates = true;
188 }
189
190 ASSERT_TRUE(hasDuplicates);
191 }
192
193 // Multi-GPU
TEST(TestStream,MultiGPUTest)194 TEST(TestStream, MultiGPUTest) {
195 if (!at::cuda::is_available()) return;
196 if (at::cuda::getNumGPUs() < 2)
197 return;
198
199 at::cuda::CUDAStream s0 = at::cuda::getStreamFromPool(true, 0);
200 at::cuda::CUDAStream s1 = at::cuda::getStreamFromPool(false, 1);
201
202 at::cuda::setCurrentCUDAStream(s0);
203 at::cuda::setCurrentCUDAStream(s1);
204
205 ASSERT_EQ_CUDA(s0, at::cuda::getCurrentCUDAStream());
206
207 at::cuda::CUDAGuard device_guard{1};
208 ASSERT_EQ_CUDA(s1, at::cuda::getCurrentCUDAStream());
209 }
210
211 // CUDAEvent Syncs
TEST(TestStream,CUDAEventSyncTest)212 TEST(TestStream, CUDAEventSyncTest) {
213 if (!at::cuda::is_available()) return;
214 const auto stream = at::cuda::getStreamFromPool();
215 at::cuda::CUDAEvent event;
216
217 ASSERT_TRUE(event.query());
218
219 event.recordOnce(stream);
220
221 const auto wait_stream0 = at::cuda::getStreamFromPool();
222 const auto wait_stream1 = at::cuda::getStreamFromPool();
223
224 event.block(wait_stream0);
225 event.block(wait_stream1);
226
227 cudaStreamSynchronize(wait_stream0);
228 ASSERT_TRUE(event.query());
229 }
230
231 // Cross-Device Events
TEST(TestStream,CrossDeviceTest)232 TEST(TestStream, CrossDeviceTest) {
233 if (!at::cuda::is_available()) return;
234 if (at::cuda::getNumGPUs() < 2)
235 return;
236
237 const auto stream0 = at::cuda::getStreamFromPool();
238 at::cuda::CUDAEvent event0;
239
240 at::cuda::set_device(1);
241 const auto stream1 = at::cuda::getStreamFromPool();
242 at::cuda::CUDAEvent event1;
243
244 event0.record(stream0);
245 event1.record(stream1);
246
247 event0 = std::move(event1);
248
249 ASSERT_EQ_CUDA(event0.device(), at::Device(at::kCUDA, 1));
250
251 event0.block(stream0);
252
253 cudaStreamSynchronize(stream0);
254 ASSERT_TRUE(event0.query());
255 }
256
257 // Generic Events
TEST(TestStream,GenericInlineCUDAEventTest)258 TEST(TestStream, GenericInlineCUDAEventTest) {
259 if (!at::cuda::is_available()) return;
260
261 c10::impl::InlineEvent<c10::cuda::impl::CUDAGuardImpl> event{c10::DeviceType::CUDA};
262 c10::Stream stream = at::cuda::getStreamFromPool();
263
264 event.record(stream);
265
266 const c10::Stream wait_stream0 = at::cuda::getStreamFromPool();
267 const c10::Stream wait_stream1 = at::cuda::getStreamFromPool();
268
269 event.block(wait_stream0);
270 event.block(wait_stream1);
271
272 const at::cuda::CUDAStream cuda_stream{wait_stream0};
273 cudaStreamSynchronize(cuda_stream);
274
275 ASSERT_TRUE(event.query());
276 }
277
TEST(TestStream,GenericVirtualCUDAEventTest)278 TEST(TestStream, GenericVirtualCUDAEventTest) {
279 if (!at::cuda::is_available()) return;
280
281 c10::Event event{c10::DeviceType::CUDA};
282 c10::Stream stream = at::cuda::getStreamFromPool();
283
284 event.recordOnce(stream);
285
286 const c10::Stream wait_stream0 = at::cuda::getStreamFromPool();
287 const c10::Stream wait_stream1 = at::cuda::getStreamFromPool();
288
289 wait_stream0.wait(event);
290 wait_stream1.wait(event);
291
292 const at::cuda::CUDAStream cuda_stream{wait_stream0};
293 cudaStreamSynchronize(cuda_stream);
294
295 ASSERT_TRUE(event.query());
296 ASSERT_TRUE(event.flag() == c10::EventFlag::PYTORCH_DEFAULT);
297 }
298
299 // Verifies external streams can be created and used
TEST(TestStream,ExternalTest)300 TEST(TestStream, ExternalTest) {
301 if (!at::cuda::is_available())
302 return;
303 at::cuda::CUDAGuard device_guard(0);
304
305 cudaStream_t cuda_stream;
306 cudaStreamCreateWithPriority(&cuda_stream, cudaStreamNonBlocking, -1);
307
308 at::cuda::CUDAStream myStream =
309 at::cuda::getStreamFromExternal(cuda_stream, 0);
310
311 at::cuda::setCurrentCUDAStream(myStream);
312 at::cuda::CUDAStream curStream = at::cuda::getCurrentCUDAStream();
313
314 ASSERT_EQ_CUDA(curStream, myStream);
315 ASSERT_EQ_CUDA(curStream.stream(), cuda_stream);
316
317 cudaStreamDestroy(cuda_stream);
318 }
319
320 // Verifies different external streams can be used for different devices at the
321 // same time
TEST(TestStream,ExternalMultiDeviceTest)322 TEST(TestStream, ExternalMultiDeviceTest) {
323 if (!at::cuda::is_available())
324 return;
325 if (at::cuda::getNumGPUs() < 2)
326 return;
327 cudaStream_t cuda_stream_0;
328 cudaStream_t cuda_stream_1;
329 {
330 at::cuda::CUDAGuard device_guard(0);
331 cudaStreamCreateWithPriority(&cuda_stream_0, cudaStreamNonBlocking, -1);
332 }
333 {
334 at::cuda::CUDAGuard device_guard(1);
335 cudaStreamCreateWithPriority(&cuda_stream_1, cudaStreamNonBlocking, -1);
336 }
337 at::cuda::CUDAStream myStream0 =
338 at::cuda::getStreamFromExternal(cuda_stream_0, 0);
339 at::cuda::CUDAStream myStream1 =
340 at::cuda::getStreamFromExternal(cuda_stream_1, 1);
341
342 at::cuda::setCurrentCUDAStream(myStream0);
343 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(0), myStream0);
344 at::cuda::setCurrentCUDAStream(myStream1);
345 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(0), myStream0);
346 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(1), myStream1);
347
348 cudaStreamDestroy(cuda_stream_0);
349 cudaStreamDestroy(cuda_stream_1);
350 }
351
352 // Verifies external streams work with guards, even nested ones
TEST(TestStream,ExternalGuardTest)353 TEST(TestStream, ExternalGuardTest) {
354 if (!at::cuda::is_available())
355 return;
356 at::cuda::CUDAGuard device_guard(0);
357
358 cudaStream_t a_cuda_stream;
359 cudaStream_t another_cuda_stream;
360 cudaStreamCreateWithPriority(&a_cuda_stream, cudaStreamNonBlocking, -1);
361 cudaStreamCreateWithPriority(&another_cuda_stream, cudaStreamNonBlocking, -1);
362 at::cuda::CUDAStream myFirstStream =
363 at::cuda::getStreamFromExternal(a_cuda_stream, 0);
364 at::cuda::CUDAStream mySecondStream =
365 at::cuda::getStreamFromExternal(another_cuda_stream, 0);
366
367 at::cuda::CUDAStream originalStream = at::cuda::getCurrentCUDAStream();
368 {
369 at::cuda::CUDAStreamGuard outerGuard(myFirstStream);
370 ASSERT_EQ_CUDA(outerGuard.original_stream(), originalStream);
371 ASSERT_EQ_CUDA(outerGuard.current_stream(), myFirstStream);
372 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(), myFirstStream);
373 {
374 at::cuda::CUDAStreamGuard innerGuard(mySecondStream);
375 ASSERT_EQ_CUDA(innerGuard.original_stream(), myFirstStream);
376 ASSERT_EQ_CUDA(innerGuard.current_stream(), mySecondStream);
377 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(), mySecondStream);
378 }
379 ASSERT_EQ_CUDA(outerGuard.original_stream(), originalStream);
380 ASSERT_EQ_CUDA(outerGuard.current_stream(), myFirstStream);
381 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(), myFirstStream);
382 outerGuard.reset_stream(mySecondStream);
383 ASSERT_EQ_CUDA(outerGuard.original_stream(), originalStream);
384 ASSERT_EQ_CUDA(outerGuard.current_stream(), mySecondStream);
385 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(), mySecondStream);
386 }
387 ASSERT_EQ_CUDA(at::cuda::getCurrentCUDAStream(), originalStream);
388
389 cudaStreamDestroy(a_cuda_stream);
390 cudaStreamDestroy(another_cuda_stream);
391 }
392
393 // Verifies that different threads stage their external streams to different
394 // places in memory and thus don't interfere
TEST(TestStream,ExternalMultiThreadTest)395 TEST(TestStream, ExternalMultiThreadTest) {
396 if (!at::cuda::is_available())
397 return;
398 at::cuda::CUDAGuard device_guard(0);
399
400 cudaStream_t cuda_stream_a;
401 cudaStream_t cuda_stream_b;
402 cudaStreamCreateWithPriority(&cuda_stream_a, cudaStreamNonBlocking, -1);
403 cudaStreamCreateWithPriority(&cuda_stream_b, cudaStreamNonBlocking, -1);
404 at::cuda::CUDAStream myStreamA =
405 at::cuda::getStreamFromExternal(cuda_stream_a, 0);
406 at::cuda::CUDAStream myStreamB =
407 at::cuda::getStreamFromExternal(cuda_stream_b, 0);
408
409 std::promise<void> aToBProm;
410 std::promise<void> bToAProm;
411 std::optional<at::cuda::CUDAStream> foundStream;
412
413 std::thread threadA([&]() {
414 at::cuda::CUDAGuard device_guard(0);
415 at::cuda::setCurrentCUDAStream(myStreamA);
416 aToBProm.set_value();
417 bToAProm.get_future().wait();
418 foundStream = at::cuda::getCurrentCUDAStream();
419 });
420
421 std::thread threadB([&]() {
422 at::cuda::CUDAGuard device_guard(0);
423 aToBProm.get_future().wait();
424 at::cuda::setCurrentCUDAStream(myStreamB);
425 bToAProm.set_value();
426 });
427
428 threadA.join();
429 threadB.join();
430
431 ASSERT_EQ_CUDA(*foundStream, myStreamA);
432
433 cudaStreamDestroy(cuda_stream_a);
434 cudaStreamDestroy(cuda_stream_b);
435 }
436