• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/data/unbounded_thread_pool.h"
17 
18 #include "tensorflow/core/lib/core/blocking_counter.h"
19 #include "tensorflow/core/lib/random/random.h"
20 #include "tensorflow/core/platform/test.h"
21 
22 namespace tensorflow {
23 namespace data {
24 namespace {
25 
TEST(UnboundedThreadPool,ConcurrentThreadCreation)26 TEST(UnboundedThreadPool, ConcurrentThreadCreation) {
27   UnboundedThreadPool pool(Env::Default(), "test");
28   auto thread_factory = pool.get_thread_factory();
29 
30   // Create ten threads that each create ten threads that update a variable, and
31   // ensure that they all run to completion.
32   std::vector<std::unique_ptr<Thread>> threads;
33   const int kNumThreadsToCreate = 10;
34   std::atomic<int> i(0);
35   for (int j = 0; j < kNumThreadsToCreate; ++j) {
36     threads.push_back(thread_factory->StartThread("", [=, &i,
37                                                        &thread_factory]() {
38       std::vector<std::unique_ptr<Thread>> nested_threads;
39       for (int k = 0; k < kNumThreadsToCreate; ++k) {
40         nested_threads.push_back(
41             thread_factory->StartThread("", [&i]() { ++i; }));
42       }
43       nested_threads.clear();
44     }));
45   }
46   threads.clear();
47 
48   EXPECT_EQ(i, kNumThreadsToCreate * kNumThreadsToCreate);
49 }
50 
TEST(UnboundedThreadPool,MultipleBlockingThreads)51 TEST(UnboundedThreadPool, MultipleBlockingThreads) {
52   UnboundedThreadPool pool(Env::Default(), "test");
53   auto thread_factory = pool.get_thread_factory();
54 
55   std::vector<std::unique_ptr<Thread>> threads;
56 
57   // Create multiple waves (with increasing sizes) of threads that all block
58   // before returning, and ensure that we terminate correctly.
59   std::vector<int> round_sizes = {5, 10, 15, 20};
60 
61   for (const int round_size : round_sizes) {
62     Notification n;
63     BlockingCounter bc(round_size);
64     for (int j = 0; j < round_size; ++j) {
65       threads.push_back(thread_factory->StartThread("", [&bc, &n]() {
66         bc.DecrementCount();
67         // Block until `n` is notified, so that all ten threads must been
68         // created before the first one completes.
69         n.WaitForNotification();
70       }));
71     }
72 
73     // Wait until all threads have started. Since the number of threads in each
74     // wave is increasing, we should have at least that number of threads in the
75     // pool.
76     bc.Wait();
77     n.Notify();
78     threads.clear();
79   }
80 }
81 
82 }  // namespace
83 }  // namespace data
84 }  // namespace tensorflow
85