• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_KERNELS_DATA_UNBOUNDED_THREAD_POOL_H_
16 #define TENSORFLOW_CORE_KERNELS_DATA_UNBOUNDED_THREAD_POOL_H_
17 
18 #include <deque>
19 #include <memory>
20 #include <vector>
21 
22 #include "tensorflow/core/framework/thread_factory.h"
23 #include "tensorflow/core/lib/core/notification.h"
24 #include "tensorflow/core/platform/env.h"
25 #include "tensorflow/core/platform/mutex.h"
26 
27 namespace tensorflow {
28 namespace data {
29 
30 // An `UnboundedThreadPool` provides a mechanism for temporally multiplexing a
31 // potentially large number of "logical" threads onto a smaller number of
32 // "physical" threads. The multiplexing is achieved by maintaining an internal
33 // pool of long-running "physical" threads that are used to execute the
34 // "logical" threads.  Like a regular thread, a "logical" thread may block on
35 // other threads, and the size of the pool will increase to ensure that progress
36 // is made. This mechanism is recommended in situations where short-lived
37 // threads are created repeatedly, to avoid the overhead and memory
38 // fragmentation that can result from excessive thread creation.
39 class UnboundedThreadPool {
40  public:
UnboundedThreadPool(Env * env,const string & thread_name)41   UnboundedThreadPool(Env* env, const string& thread_name)
42       : env_(env), thread_name_(thread_name) {}
43   ~UnboundedThreadPool();
44 
45   // Returns an implementation of `ThreadFactory` that can be used to create
46   // logical threads in this pool.
47   std::shared_ptr<ThreadFactory> get_thread_factory();
48 
49   // Returns the current number of threads in this pool.
50   size_t size();
51 
52  private:
53   class LogicalThreadFactory;
54   class LogicalThreadWrapper;
55   struct WorkItem {
56     std::function<void()> work_function;
57     std::shared_ptr<Notification> done_notification;
58   };
59 
60   std::unique_ptr<Thread> RunOnPooledThread(std::function<void()> fn);
61   void PooledThreadFunc();
62 
63   Env* const env_;  // Not owned.
64   const string thread_name_;
65   mutex work_queue_mu_;
66   condition_variable work_queue_cv_ GUARDED_BY(work_queue_mu_);
67   size_t num_idle_threads_ GUARDED_BY(work_queue_mu_) = 0;
68   bool cancelled_ GUARDED_BY(work_queue_mu_) = false;
69   std::deque<WorkItem> work_queue_ GUARDED_BY(work_queue_mu_);
70   mutex thread_pool_mu_;
71   std::vector<std::unique_ptr<Thread>> thread_pool_ GUARDED_BY(thread_pool_mu_);
72 };
73 
74 }  // namespace data
75 }  // namespace tensorflow
76 
77 #endif  // TENSORFLOW_CORE_KERNELS_DATA_UNBOUNDED_THREAD_POOL_H_
78