1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18
19 #include <stddef.h>
20
21 #include <cstddef>
22 #include <functional>
23 #include <memory>
24 #include <string>
25
26 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
27
28 namespace tensorflow {
29 namespace serving {
30
31 // A BatchScheduler implementation geared toward handling a single request type
32 // running on a specific set of hardware resources. A typical scenario is one in
33 // which all requests invoke the same machine-learned model on one GPU.
34 //
35 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
36 // could use two BasicBatchScheduler instances to schedule the two model/GPU
37 // combinations independently. If multiple models must share a given GPU or
38 // other hardware resource, consider using SharedBatchScheduler instead.
39 //
40 //
41 // PARAMETERS AND BEHAVIOR:
42 //
43 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
44 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
45 // number of tasks. If the queue is nearly empty, such that a full batch cannot
46 // be formed, when a thread becomes free, it anyway schedules a batch
47 // immediately if a task has been in the queue for longer than a given timeout
48 // parameter. If the timeout parameter is set to 0, then the batch threads will
49 // always be kept busy (unless there are zero tasks waiting to be processed).
50 //
51 // For online serving, it is recommended to set the maximum number of enqueued
52 // batches worth of tasks equal to the number of batch threads, which allows
53 // enqueuing of enough tasks s.t. if every thread becomes available it can be
54 // kept busy, but no more. For bulk processing jobs and throughput-oriented
55 // benchmarks, you may want to set it much higher.
56 //
57 // When Schedule() is called, if the queue is full the call will fail with an
58 // UNAVAILABLE error (after which the client may retry again later). If the call
59 // succeeds, the maximum time the task will spend in the queue before being
60 // placed in a batch and assigned to a thread for processing, is the greater of:
61 // - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
62 // (1 in the recommended configuration) batches of previously-submitted tasks
63 // - the configured timeout parameter (which can be 0, as mentioned above)
64 //
65 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
66 // thread, it closes the batch. The process-batch callback may assume that every
67 // batch it receives is closed at the outset.
68 //
69 //
70 // RECOMMENDED USE-CASES:
71 //
72 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
73 // request (e.g. a server performing inference with a single machine-learned
74 // model, possibly evolving over time), with loose versioning semantics.
75 // Concretely, the following conditions should hold:
76 //
77 // A. All requests batched onto a given resource (e.g. a hardware accelerator,
78 // or a pool accelerators) are of the same type. For example, they all
79 // invoke the same machine-learned model.
80 //
81 // These variations are permitted:
82 // - The model may reside in a single servable, or it may be spread across
83 // multiple servables that are used in unison (e.g. a vocabulary lookup
84 // table servable and a tensorflow session servable).
85 // - The model's servable(s) may be static, or they may evolve over time
86 // (successive servable versions).
87 // - Zero or more of the servables are used in the request thread; the rest
88 // are used in the batch thread. In our running example, the vocabulary
89 // lookups and tensorflow runs may both be performed in the batch thread,
90 // or alternatively the vocabulary lookup may occur in the request thread
91 // with only the tensorflow run performed in the batch thread.
92 //
93 // In contrast, BasicBatchScheduler is not a good fit if the server
94 // hosts multiple distinct models running on a pool accelerators, with each
95 // request specifying which model it wants to use. BasicBatchScheduler
96 // has no facility to time-multiplex the batch threads across multiple
97 // models in a principled way. More basically, it cannot ensure that a given
98 // batch doesn't contain a mixture of requests for different models.
99 //
100 // B. Requests do not specify a particular version of the servable(s) that must
101 // be used. Instead, each request is content to use the "latest" version.
102 //
103 // BasicBatchScheduler does not constrain which requests get grouped
104 // together into a batch, so using this scheduler there is no way to achieve
105 // cohesion of versioned requests to version-specific batches.
106 //
107 // C. No servable version coordination needs to be performed between the
108 // request threads and the batch threads. Often, servables are only used in
109 // the batch threads, in which case this condition trivially holds. If
110 // servables are used in both threads, then the use-case must tolerate
111 // version skew across the servables used in the two kinds of threads.
112 //
113 //
114 // EXAMPLE USE-CASE FLOW:
115 //
116 // For such use-cases, request processing via BasicBatchScheduler generally
117 // follows this flow (given for illustration; variations are possible):
118 // 1. Optionally perform some pre-processing on each request in the request
119 // threads.
120 // 2. Route the requests to the batch scheduler, as batching::Task objects.
121 // (Since all requests are of the same type and are not versioned, the
122 // scheduler is free to group them into batches arbitrarily.)
123 // 3. Merge the requests into a single batched representation B.
124 // 4. Obtain handles to the servable(s) needed to process B. The simplest
125 // approach is to obtain the latest version of each servable. Alternatively,
126 // if cross-servable consistency is required (e.g. the vocabulary lookup
127 // table's version number must match that of the tensorflow session),
128 // identify an appropriate version number and obtain the servable handles
129 // accordingly.
130 // 5. Process B using the obtained servable handles, and split the result into
131 // individual per-request units.
132 // 6. Perform any post-processing in the batch thread and/or request thread.
133 //
134 //
135 // PERFORMANCE TUNING: See README.md.
136 //
137 template <typename TaskType>
138 class BasicBatchScheduler : public BatchScheduler<TaskType> {
139 public:
140 // TODO(b/25089730): Tune defaults based on best practices as they develop.
141 // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
142 // SharedBatchScheduler::Options.)
143 struct Options {
144 // Options related with (underlying) shared batch scheduler.
145 // 'thread_pool_name' and 'num_batch_threads' are used to initialize
146 // a shared batch scheduler underlyingly iff 'shared_batch_scheduler' is
147 // nullptr.
148 //
149 // There are two ways to specify threading:
150 // 1) Have each session create its own pool.
151 // 2) Have multiple sessions share the same pool.
152 //
153 // In general, the number of threads should be tied to roughly the number of
154 // compute resources (CPU cores or accelerator cores) backing the threads.
155 // Sharing a thread pool helps alleviate potential over allocation of
156 // threads to limited compute resources.
157
158 // To have each session create its own thread pool (1) set
159 // thread_pool_name/num_batch_threads.
160
161 // To share a thread pool (2) create a scheduler and pass it in.
162
163 // The name to use for the pool of batch threads.
164 string thread_pool_name = {"batch_threads"};
165
166 // The number of threads to use to process batches.
167 // Must be >= 1, and should be tuned carefully.
168 int num_batch_threads = port::MaxParallelism();
169
170 // If specified, this scheduler will be used underlyingly to schedule
171 // batches. Note setting this means `thread_pool_name` and
172 // `num_batch_threads` are ignored.
173 std::shared_ptr<SharedBatchScheduler<TaskType>> shared_batch_scheduler =
174 nullptr;
175
176 // Options for queue.
177 // The maximum size of each batch.
178 //
179 // The scheduler may form batches of any size between 1 and this number
180 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
181 // submit batches whose size is in a small set of allowed sizes, that can be
182 // done by adding padding in the process-batch callback.
183 int max_batch_size = 1000;
184
185 // If a task has been enqueued for this amount of time (in microseconds),
186 // and a thread is available, the scheduler will immediately form a batch
187 // from enqueued tasks and assign the batch to the thread for processing,
188 // even if the batch's size is below 'max_batch_size'.
189 //
190 // This parameter offers a way to bound queue latency, so that a task isn't
191 // stuck in the queue indefinitely waiting for enough tasks to arrive to
192 // make a full batch. (The latency bound is given in the class documentation
193 // above.)
194 //
195 // The goal is to smooth out batch sizes under low request rates, and thus
196 // avoid latency spikes.
197 int64 batch_timeout_micros = 0;
198
199
200 // The maximum allowable number of enqueued (accepted by Schedule() but
201 // not yet being processed on a batch thread) tasks in terms of batches.
202 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
203 // See the class documentation above for guidelines on how to tune this
204 // parameter.
205 int max_enqueued_batches = 10;
206
207 // If true, an input task (i.e., input of `BasicBatchScheduler::Schedule`)
208 // with a large size (i.e., larger than the largest value of
209 // `allowed_batch_sizes`) will be split into multiple smaller batch tasks
210 // and possibly put into different batches for processing. If false, each
211 // input task is put into one batch as a whole for processing.
212 //
213 // API note:
214 // The value of this option doesn't affect processing output given the same
215 // input; it affects implementation details as stated below:
216 // 1. Improve batching efficiency by eliminating unnecessary padding in the
217 // following scenario: when an open batch has M slots while an input of size
218 // N is scheduled (M < N), the input can be split to fill remaining slots
219 // of an open batch as opposed to padding.
220 // 2.`max_batch_size` specifies the limit of input and
221 // `max_execution_batch_size` specifies the limit of a task to be processed.
222 // API user can give an input of size 128 when 'max_execution_batch_size'
223 // is 32 -> implementation can split input of 128 into 4 x 32, schedule
224 // concurrent processing, and then return concatenated results corresponding
225 // to 128.
226 bool enable_large_batch_splitting = false;
227
228 // If true, inputs split happens lazily after dequeue and not on the
229 // critical path of enqueue.
230 //
231 // Must be false if `enable_large_batch_splitting` is false; elsewise errors
232 // are returned at queue creation time.
233 bool enable_lazy_split = false;
234
235 // `split_input_task_func` specifies how to split `input_task` into
236 // `output_tasks`.
237 //
238 // `input_task`: a unit of task to be split.
239 // `first_output_task_size`: task size of first output.
240 // `max_batch_size`: Maximum size of each batch.
241 // `output_tasks`: A list of output tasks after split.
242 //
243 // REQUIRED:
244 // 1) All `output_tasks` should be non-empty tasks.
245 // 2) Sizes of `output_tasks` add up to size of `input_task`.
246 //
247 // NOTE:
248 // Instantiations of `TaskType` may vary, so it's up to caller to define
249 // how (e.g., which members to access) to split input tasks.
250 std::function<Status(std::unique_ptr<TaskType>* input_task,
251 int first_output_task_size, int input_batch_size_limit,
252 std::vector<std::unique_ptr<TaskType>>* output_tasks)>
253 split_input_task_func;
254
255 // The maximum size of each enqueued batch (i.e., in `batches_`).
256 //
257 // The scheduler may form batches of any size between 1 and this number
258 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
259 // submit batches whose size is in a small set of allowed sizes, that can be
260 // done by adding padding in the process-batch callback.
261 //
262 // REQUIRES:
263 // - If enable_large_batch_splitting is true, `max_execution_batch_size` is
264 // less than or equal to `max_batch_size`.
265 // - If enable_large_batch_splitting is false, `max_execution_batch_size` is
266 // equal to `max_batch_size`.
267 int max_execution_batch_size = 10;
268
269 // The following options are typically only overridden by test code.
270
271 // The environment to use.
272 Env* env = Env::Default();
273 };
274 static Status Create(const Options& options,
275 std::function<void(std::unique_ptr<Batch<TaskType>>)>
276 process_batch_callback,
277 std::unique_ptr<BasicBatchScheduler>* scheduler);
278
279 ~BasicBatchScheduler() override = default;
280
281 Status Schedule(std::unique_ptr<TaskType>* task) override;
282 size_t NumEnqueuedTasks() const override;
283 size_t SchedulingCapacity() const override;
284
max_task_size()285 size_t max_task_size() const override {
286 return shared_scheduler_queue_->max_task_size();
287 }
288
289 private:
290 explicit BasicBatchScheduler(
291 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
292
293 // This class is merely a thin wrapper around a SharedBatchScheduler with a
294 // single queue.
295 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
296
297 TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
298 };
299
300 //////////
301 // Implementation details follow. API users need not read.
302
303 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)304 Status BasicBatchScheduler<TaskType>::Create(
305 const Options& options,
306 std::function<void(std::unique_ptr<Batch<TaskType>>)>
307 process_batch_callback,
308 std::unique_ptr<BasicBatchScheduler>* scheduler) {
309 std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
310
311 if (options.shared_batch_scheduler == nullptr) {
312 typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
313 shared_scheduler_options.thread_pool_name = options.thread_pool_name;
314 shared_scheduler_options.num_batch_threads = options.num_batch_threads;
315 shared_scheduler_options.env = options.env;
316
317 TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
318 shared_scheduler_options, &shared_scheduler));
319 } else {
320 shared_scheduler = options.shared_batch_scheduler;
321 }
322
323 typename SharedBatchScheduler<TaskType>::QueueOptions
324 shared_scheduler_queue_options;
325 shared_scheduler_queue_options.input_batch_size_limit =
326 options.max_batch_size;
327 shared_scheduler_queue_options.batch_timeout_micros =
328 options.batch_timeout_micros;
329 shared_scheduler_queue_options.max_enqueued_batches =
330 options.max_enqueued_batches;
331 shared_scheduler_queue_options.enable_large_batch_splitting =
332 options.enable_large_batch_splitting;
333 shared_scheduler_queue_options.split_input_task_func =
334 options.split_input_task_func;
335 shared_scheduler_queue_options.enable_lazy_split = options.enable_lazy_split;
336 shared_scheduler_queue_options.max_execution_batch_size =
337 options.max_execution_batch_size;
338 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
339 TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
340 process_batch_callback,
341 &shared_scheduler_queue));
342
343 scheduler->reset(
344 new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
345 return Status::OK();
346 }
347
348 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)349 Status BasicBatchScheduler<TaskType>::Schedule(
350 std::unique_ptr<TaskType>* task) {
351 return shared_scheduler_queue_->Schedule(task);
352 }
353
354 template <typename TaskType>
NumEnqueuedTasks()355 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
356 return shared_scheduler_queue_->NumEnqueuedTasks();
357 }
358
359 template <typename TaskType>
SchedulingCapacity()360 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
361 return shared_scheduler_queue_->SchedulingCapacity();
362 }
363
364 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)365 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
366 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
367 : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
368
369 } // namespace serving
370 } // namespace tensorflow
371
372 #endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
373