1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18
19 #include <stddef.h>
20 #include <cstddef>
21 #include <functional>
22 #include <memory>
23 #include <string>
24
25 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
26
27 namespace tensorflow {
28 namespace serving {
29
30 // A BatchScheduler implementation geared toward handling a single request type
31 // running on a specific set of hardware resources. A typical scenario is one in
32 // which all requests invoke the same machine-learned model on one GPU.
33 //
34 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
35 // could use two BasicBatchScheduler instances to schedule the two model/GPU
36 // combinations independently. If multiple models must share a given GPU or
37 // other hardware resource, consider using SharedBatchScheduler instead.
38 //
39 //
40 // PARAMETERS AND BEHAVIOR:
41 //
42 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
43 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
44 // number of tasks. If the queue is nearly empty, such that a full batch cannot
45 // be formed, when a thread becomes free, it anyway schedules a batch
46 // immediately if a task has been in the queue for longer than a given timeout
47 // parameter. If the timeout parameter is set to 0, then the batch threads will
48 // always be kept busy (unless there are zero tasks waiting to be processed).
49 //
50 // For online serving, it is recommended to set the maximum number of enqueued
51 // batches worth of tasks equal to the number of batch threads, which allows
52 // enqueuing of enough tasks s.t. if every thread becomes available it can be
53 // kept busy, but no more. For bulk processing jobs and throughput-oriented
54 // benchmarks, you may want to set it much higher.
55 //
56 // When Schedule() is called, if the queue is full the call will fail with an
57 // UNAVAILABLE error (after which the client may retry again later). If the call
58 // succeeds, the maximum time the task will spend in the queue before being
59 // placed in a batch and assigned to a thread for processing, is the greater of:
60 // - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
61 // (1 in the recommended configuration) batches of previously-submitted tasks
62 // - the configured timeout parameter (which can be 0, as mentioned above)
63 //
64 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
65 // thread, it closes the batch. The process-batch callback may assume that every
66 // batch it receives is closed at the outset.
67 //
68 //
69 // RECOMMENDED USE-CASES:
70 //
71 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
72 // request (e.g. a server performing inference with a single machine-learned
73 // model, possibly evolving over time), with loose versioning semantics.
74 // Concretely, the following conditions should hold:
75 //
76 // A. All requests batched onto a given resource (e.g. a hardware accelerator,
77 // or a pool accelerators) are of the same type. For example, they all
78 // invoke the same machine-learned model.
79 //
80 // These variations are permitted:
81 // - The model may reside in a single servable, or it may be spread across
82 // multiple servables that are used in unison (e.g. a vocabulary lookup
83 // table servable and a tensorflow session servable).
84 // - The model's servable(s) may be static, or they may evolve over time
85 // (successive servable versions).
86 // - Zero or more of the servables are used in the request thread; the rest
87 // are used in the batch thread. In our running example, the vocabulary
88 // lookups and tensorflow runs may both be performed in the batch thread,
89 // or alternatively the vocabulary lookup may occur in the request thread
90 // with only the tensorflow run performed in the batch thread.
91 //
92 // In contrast, BasicBatchScheduler is not a good fit if the server
93 // hosts multiple distinct models running on a pool accelerators, with each
94 // request specifying which model it wants to use. BasicBatchScheduler
95 // has no facility to time-multiplex the batch threads across multiple
96 // models in a principled way. More basically, it cannot ensure that a given
97 // batch doesn't contain a mixture of requests for different models.
98 //
99 // B. Requests do not specify a particular version of the servable(s) that must
100 // be used. Instead, each request is content to use the "latest" version.
101 //
102 // BasicBatchScheduler does not constrain which requests get grouped
103 // together into a batch, so using this scheduler there is no way to achieve
104 // cohesion of versioned requests to version-specific batches.
105 //
106 // C. No servable version coordination needs to be performed between the
107 // request threads and the batch threads. Often, servables are only used in
108 // the batch threads, in which case this condition trivially holds. If
109 // servables are used in both threads, then the use-case must tolerate
110 // version skew across the servables used in the two kinds of threads.
111 //
112 //
113 // EXAMPLE USE-CASE FLOW:
114 //
115 // For such use-cases, request processing via BasicBatchScheduler generally
116 // follows this flow (given for illustration; variations are possible):
117 // 1. Optionally perform some pre-processing on each request in the request
118 // threads.
119 // 2. Route the requests to the batch scheduler, as batching::Task objects.
120 // (Since all requests are of the same type and are not versioned, the
121 // scheduler is free to group them into batches arbitrarily.)
122 // 3. Merge the requests into a single batched representation B.
123 // 4. Obtain handles to the servable(s) needed to process B. The simplest
124 // approach is to obtain the latest version of each servable. Alternatively,
125 // if cross-servable consistency is required (e.g. the vocabulary lookup
126 // table's version number must match that of the tensorflow session),
127 // identify an appropriate version number and obtain the servable handles
128 // accordingly.
129 // 5. Process B using the obtained servable handles, and split the result into
130 // individual per-request units.
131 // 6. Perform any post-processing in the batch thread and/or request thread.
132 //
133 //
134 // PERFORMANCE TUNING: See README.md.
135 //
136 template <typename TaskType>
137 class BasicBatchScheduler : public BatchScheduler<TaskType> {
138 public:
139 // TODO(b/25089730): Tune defaults based on best practices as they develop.
140 // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
141 // SharedBatchScheduler::Options.)
142 struct Options {
143 // The maximum size of each batch.
144 //
145 // The scheduler may form batches of any size between 1 and this number
146 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
147 // submit batches whose size is in a small set of allowed sizes, that can be
148 // done by adding padding in the process-batch callback.
149 int max_batch_size = 1000;
150
151 // If a task has been enqueued for this amount of time (in microseconds),
152 // and a thread is available, the scheduler will immediately form a batch
153 // from enqueued tasks and assign the batch to the thread for processing,
154 // even if the batch's size is below 'max_batch_size'.
155 //
156 // This parameter offers a way to bound queue latency, so that a task isn't
157 // stuck in the queue indefinitely waiting for enough tasks to arrive to
158 // make a full batch. (The latency bound is given in the class documentation
159 // above.)
160 //
161 // The goal is to smooth out batch sizes under low request rates, and thus
162 // avoid latency spikes.
163 int64 batch_timeout_micros = 0;
164
165 // The name to use for the pool of batch threads.
166 string thread_pool_name = {"batch_threads"};
167
168 // The number of threads to use to process batches.
169 // Must be >= 1, and should be tuned carefully.
170 int num_batch_threads = port::NumSchedulableCPUs();
171
172 // The maximum allowable number of enqueued (accepted by Schedule() but
173 // not yet being processed on a batch thread) tasks in terms of batches.
174 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
175 // See the class documentation above for guidelines on how to tune this
176 // parameter.
177 int max_enqueued_batches = 10;
178
179 // The following options are typically only overridden by test code.
180
181 // The environment to use.
182 Env* env = Env::Default();
183 };
184 static Status Create(const Options& options,
185 std::function<void(std::unique_ptr<Batch<TaskType>>)>
186 process_batch_callback,
187 std::unique_ptr<BasicBatchScheduler>* scheduler);
188
189 ~BasicBatchScheduler() override = default;
190
191 Status Schedule(std::unique_ptr<TaskType>* task) override;
192 size_t NumEnqueuedTasks() const override;
193 size_t SchedulingCapacity() const override;
194
max_task_size()195 size_t max_task_size() const override {
196 return shared_scheduler_queue_->max_task_size();
197 }
198
199 private:
200 explicit BasicBatchScheduler(
201 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
202
203 // This class is merely a thin wrapper around a SharedBatchScheduler with a
204 // single queue.
205 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
206
207 TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
208 };
209
210 //////////
211 // Implementation details follow. API users need not read.
212
213 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)214 Status BasicBatchScheduler<TaskType>::Create(
215 const Options& options,
216 std::function<void(std::unique_ptr<Batch<TaskType>>)>
217 process_batch_callback,
218 std::unique_ptr<BasicBatchScheduler>* scheduler) {
219 typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
220 shared_scheduler_options.thread_pool_name = options.thread_pool_name;
221 shared_scheduler_options.num_batch_threads = options.num_batch_threads;
222 shared_scheduler_options.env = options.env;
223 std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
224 TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
225 shared_scheduler_options, &shared_scheduler));
226
227 typename SharedBatchScheduler<TaskType>::QueueOptions
228 shared_scheduler_queue_options;
229 shared_scheduler_queue_options.max_batch_size = options.max_batch_size;
230 shared_scheduler_queue_options.batch_timeout_micros =
231 options.batch_timeout_micros;
232 shared_scheduler_queue_options.max_enqueued_batches =
233 options.max_enqueued_batches;
234 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
235 TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
236 process_batch_callback,
237 &shared_scheduler_queue));
238
239 scheduler->reset(
240 new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
241 return Status::OK();
242 }
243
244 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)245 Status BasicBatchScheduler<TaskType>::Schedule(
246 std::unique_ptr<TaskType>* task) {
247 return shared_scheduler_queue_->Schedule(task);
248 }
249
250 template <typename TaskType>
NumEnqueuedTasks()251 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
252 return shared_scheduler_queue_->NumEnqueuedTasks();
253 }
254
255 template <typename TaskType>
SchedulingCapacity()256 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
257 return shared_scheduler_queue_->SchedulingCapacity();
258 }
259
260 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)261 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
262 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
263 : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
264
265 } // namespace serving
266 } // namespace tensorflow
267
268 #endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
269