1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 
21 #include <cstddef>
22 #include <functional>
23 #include <memory>
24 #include <string>
25 
26 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
27 
28 namespace tensorflow {
29 namespace serving {
30 
31 // A BatchScheduler implementation geared toward handling a single request type
32 // running on a specific set of hardware resources. A typical scenario is one in
33 // which all requests invoke the same machine-learned model on one GPU.
34 //
35 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
36 // could use two BasicBatchScheduler instances to schedule the two model/GPU
37 // combinations independently. If multiple models must share a given GPU or
38 // other hardware resource, consider using SharedBatchScheduler instead.
39 //
40 //
41 // PARAMETERS AND BEHAVIOR:
42 //
43 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
44 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
45 // number of tasks. If the queue is nearly empty, such that a full batch cannot
46 // be formed, when a thread becomes free, it anyway schedules a batch
47 // immediately if a task has been in the queue for longer than a given timeout
48 // parameter. If the timeout parameter is set to 0, then the batch threads will
49 // always be kept busy (unless there are zero tasks waiting to be processed).
50 //
51 // For online serving, it is recommended to set the maximum number of enqueued
52 // batches worth of tasks equal to the number of batch threads, which allows
53 // enqueuing of enough tasks s.t. if every thread becomes available it can be
54 // kept busy, but no more. For bulk processing jobs and throughput-oriented
55 // benchmarks, you may want to set it much higher.
56 //
57 // When Schedule() is called, if the queue is full the call will fail with an
58 // UNAVAILABLE error (after which the client may retry again later). If the call
59 // succeeds, the maximum time the task will spend in the queue before being
60 // placed in a batch and assigned to a thread for processing, is the greater of:
61 //  - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
62 //    (1 in the recommended configuration) batches of previously-submitted tasks
63 //  - the configured timeout parameter (which can be 0, as mentioned above)
64 //
65 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
66 // thread, it closes the batch. The process-batch callback may assume that every
67 // batch it receives is closed at the outset.
68 //
69 //
70 // RECOMMENDED USE-CASES:
71 //
72 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
73 // request (e.g. a server performing inference with a single machine-learned
74 // model, possibly evolving over time), with loose versioning semantics.
75 // Concretely, the following conditions should hold:
76 //
77 //  A. All requests batched onto a given resource (e.g. a hardware accelerator,
78 //     or a pool accelerators) are of the same type. For example, they all
79 //     invoke the same machine-learned model.
80 //
81 //     These variations are permitted:
82 //      - The model may reside in a single servable, or it may be spread across
83 //        multiple servables that are used in unison (e.g. a vocabulary lookup
84 //        table servable and a tensorflow session servable).
85 //      - The model's servable(s) may be static, or they may evolve over time
86 //        (successive servable versions).
87 //      - Zero or more of the servables are used in the request thread; the rest
88 //        are used in the batch thread. In our running example, the vocabulary
89 //        lookups and tensorflow runs may both be performed in the batch thread,
90 //        or alternatively the vocabulary lookup may occur in the request thread
91 //        with only the tensorflow run performed in the batch thread.
92 //
93 //     In contrast, BasicBatchScheduler is not a good fit if the server
94 //     hosts multiple distinct models running on a pool accelerators, with each
95 //     request specifying which model it wants to use. BasicBatchScheduler
96 //     has no facility to time-multiplex the batch threads across multiple
97 //     models in a principled way. More basically, it cannot ensure that a given
98 //     batch doesn't contain a mixture of requests for different models.
99 //
100 //  B. Requests do not specify a particular version of the servable(s) that must
101 //     be used. Instead, each request is content to use the "latest" version.
102 //
103 //     BasicBatchScheduler does not constrain which requests get grouped
104 //     together into a batch, so using this scheduler there is no way to achieve
105 //     cohesion of versioned requests to version-specific batches.
106 //
107 //  C. No servable version coordination needs to be performed between the
108 //     request threads and the batch threads. Often, servables are only used in
109 //     the batch threads, in which case this condition trivially holds. If
110 //     servables are used in both threads, then the use-case must tolerate
111 //     version skew across the servables used in the two kinds of threads.
112 //
113 //
114 // EXAMPLE USE-CASE FLOW:
115 //
116 // For such use-cases, request processing via BasicBatchScheduler generally
117 // follows this flow (given for illustration; variations are possible):
118 //  1. Optionally perform some pre-processing on each request in the request
119 //     threads.
120 //  2. Route the requests to the batch scheduler, as batching::Task objects.
121 //     (Since all requests are of the same type and are not versioned, the
122 //     scheduler is free to group them into batches arbitrarily.)
123 //  3. Merge the requests into a single batched representation B.
124 //  4. Obtain handles to the servable(s) needed to process B. The simplest
125 //     approach is to obtain the latest version of each servable. Alternatively,
126 //     if cross-servable consistency is required (e.g. the vocabulary lookup
127 //     table's version number must match that of the tensorflow session),
128 //     identify an appropriate version number and obtain the servable handles
129 //     accordingly.
130 //  5. Process B using the obtained servable handles, and split the result into
131 //     individual per-request units.
132 //  6. Perform any post-processing in the batch thread and/or request thread.
133 //
134 //
135 // PERFORMANCE TUNING: See README.md.
136 //
137 template <typename TaskType>
138 class BasicBatchScheduler : public BatchScheduler<TaskType> {
139  public:
140   // TODO(b/25089730): Tune defaults based on best practices as they develop.
141   // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
142   // SharedBatchScheduler::Options.)
143   struct Options {
144     // Options related with (underlying) shared batch scheduler.
145     // 'thread_pool_name' and 'num_batch_threads' are used to initialize
146     // a shared batch scheduler underlyingly iff 'shared_batch_scheduler' is
147     // nullptr.
148     //
149     // There are two ways to specify threading:
150     // 1) Have each session create its own pool.
151     // 2) Have multiple sessions share the same pool.
152     //
153     // In general, the number of threads should be tied to roughly the number of
154     // compute resources (CPU cores or accelerator cores) backing the threads.
155     // Sharing a thread pool helps alleviate potential over allocation of
156     // threads to limited compute resources.
157 
158     // To have each session create its own thread pool (1) set
159     // thread_pool_name/num_batch_threads.
160 
161     // To share a thread pool (2) create a scheduler and pass it in.
162 
163     // The name to use for the pool of batch threads.
164     string thread_pool_name = {"batch_threads"};
165 
166     // The number of threads to use to process batches.
167     // Must be >= 1, and should be tuned carefully.
168     int num_batch_threads = port::MaxParallelism();
169 
170     // If specified, this scheduler will be used underlyingly to schedule
171     // batches. Note setting this means `thread_pool_name` and
172     // `num_batch_threads` are ignored.
173     std::shared_ptr<SharedBatchScheduler<TaskType>> shared_batch_scheduler =
174         nullptr;
175 
176     // Options for queue.
177     // The maximum size of each batch.
178     //
179     // The scheduler may form batches of any size between 1 and this number
180     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
181     // submit batches whose size is in a small set of allowed sizes, that can be
182     // done by adding padding in the process-batch callback.
183     int max_batch_size = 1000;
184 
185     // If a task has been enqueued for this amount of time (in microseconds),
186     // and a thread is available, the scheduler will immediately form a batch
187     // from enqueued tasks and assign the batch to the thread for processing,
188     // even if the batch's size is below 'max_batch_size'.
189     //
190     // This parameter offers a way to bound queue latency, so that a task isn't
191     // stuck in the queue indefinitely waiting for enough tasks to arrive to
192     // make a full batch. (The latency bound is given in the class documentation
193     // above.)
194     //
195     // The goal is to smooth out batch sizes under low request rates, and thus
196     // avoid latency spikes.
197     int64 batch_timeout_micros = 0;
198 
199 
200     // The maximum allowable number of enqueued (accepted by Schedule() but
201     // not yet being processed on a batch thread) tasks in terms of batches.
202     // If this limit is reached, Schedule() will return an UNAVAILABLE error.
203     // See the class documentation above for guidelines on how to tune this
204     // parameter.
205     int max_enqueued_batches = 10;
206 
207     // If true, an input task (i.e., input of `BasicBatchScheduler::Schedule`)
208     // with a large size (i.e., larger than the largest value of
209     // `allowed_batch_sizes`) will be split into multiple smaller batch tasks
210     // and possibly put into different batches for processing. If false, each
211     // input task is put into one batch as a whole for processing.
212     //
213     // API note:
214     // The value of this option doesn't affect processing output given the same
215     // input; it affects implementation details as stated below:
216     // 1. Improve batching efficiency by eliminating unnecessary padding in the
217     // following scenario: when an open batch has M slots while an input of size
218     // N is scheduled (M < N), the input can be split to fill remaining slots
219     // of an open batch as opposed to padding.
220     // 2.`max_batch_size` specifies the limit of input and
221     // `max_execution_batch_size` specifies the limit of a task to be processed.
222     // API user can give an input of size 128 when 'max_execution_batch_size'
223     // is 32 -> implementation can split input of 128 into 4 x 32, schedule
224     // concurrent processing, and then return concatenated results corresponding
225     // to 128.
226     bool enable_large_batch_splitting = false;
227 
228     // If true, inputs split happens lazily after dequeue and not on the
229     // critical path of enqueue.
230     //
231     // Must be false if `enable_large_batch_splitting` is false; elsewise errors
232     // are returned at queue creation time.
233     bool enable_lazy_split = false;
234 
235     // `split_input_task_func` specifies how to split `input_task` into
236     // `output_tasks`.
237     //
238     // `input_task`: a unit of task to be split.
239     // `first_output_task_size`: task size of first output.
240     // `max_batch_size`: Maximum size of each batch.
241     // `output_tasks`: A list of output tasks after split.
242     //
243     // REQUIRED:
244     // 1) All `output_tasks` should be non-empty tasks.
245     // 2) Sizes of `output_tasks` add up to size of `input_task`.
246     //
247     // NOTE:
248     // Instantiations of `TaskType` may vary, so it's up to caller to define
249     // how (e.g., which members to access) to split input tasks.
250     std::function<Status(std::unique_ptr<TaskType>* input_task,
251                          int first_output_task_size, int input_batch_size_limit,
252                          std::vector<std::unique_ptr<TaskType>>* output_tasks)>
253         split_input_task_func;
254 
255     // The maximum size of each enqueued batch (i.e., in `batches_`).
256     //
257     // The scheduler may form batches of any size between 1 and this number
258     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
259     // submit batches whose size is in a small set of allowed sizes, that can be
260     // done by adding padding in the process-batch callback.
261     //
262     // REQUIRES:
263     // - If enable_large_batch_splitting is true, `max_execution_batch_size` is
264     // less than or equal to `max_batch_size`.
265     // - If enable_large_batch_splitting is false, `max_execution_batch_size` is
266     // equal to `max_batch_size`.
267     int max_execution_batch_size = 10;
268 
269     // The following options are typically only overridden by test code.
270 
271     // The environment to use.
272     Env* env = Env::Default();
273   };
274   static Status Create(const Options& options,
275                        std::function<void(std::unique_ptr<Batch<TaskType>>)>
276                            process_batch_callback,
277                        std::unique_ptr<BasicBatchScheduler>* scheduler);
278 
279   ~BasicBatchScheduler() override = default;
280 
281   Status Schedule(std::unique_ptr<TaskType>* task) override;
282   size_t NumEnqueuedTasks() const override;
283   size_t SchedulingCapacity() const override;
284 
max_task_size()285   size_t max_task_size() const override {
286     return shared_scheduler_queue_->max_task_size();
287   }
288 
289  private:
290   explicit BasicBatchScheduler(
291       std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
292 
293   // This class is merely a thin wrapper around a SharedBatchScheduler with a
294   // single queue.
295   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
296 
297   TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
298 };
299 
300 //////////
301 // Implementation details follow. API users need not read.
302 
303 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)304 Status BasicBatchScheduler<TaskType>::Create(
305     const Options& options,
306     std::function<void(std::unique_ptr<Batch<TaskType>>)>
307         process_batch_callback,
308     std::unique_ptr<BasicBatchScheduler>* scheduler) {
309   std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
310 
311   if (options.shared_batch_scheduler == nullptr) {
312     typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
313     shared_scheduler_options.thread_pool_name = options.thread_pool_name;
314     shared_scheduler_options.num_batch_threads = options.num_batch_threads;
315     shared_scheduler_options.env = options.env;
316 
317     TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
318         shared_scheduler_options, &shared_scheduler));
319   } else {
320     shared_scheduler = options.shared_batch_scheduler;
321   }
322 
323   typename SharedBatchScheduler<TaskType>::QueueOptions
324       shared_scheduler_queue_options;
325   shared_scheduler_queue_options.input_batch_size_limit =
326       options.max_batch_size;
327   shared_scheduler_queue_options.batch_timeout_micros =
328       options.batch_timeout_micros;
329   shared_scheduler_queue_options.max_enqueued_batches =
330       options.max_enqueued_batches;
331   shared_scheduler_queue_options.enable_large_batch_splitting =
332       options.enable_large_batch_splitting;
333   shared_scheduler_queue_options.split_input_task_func =
334       options.split_input_task_func;
335   shared_scheduler_queue_options.enable_lazy_split = options.enable_lazy_split;
336   shared_scheduler_queue_options.max_execution_batch_size =
337       options.max_execution_batch_size;
338   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
339   TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
340                                                 process_batch_callback,
341                                                 &shared_scheduler_queue));
342 
343   scheduler->reset(
344       new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
345   return Status::OK();
346 }
347 
348 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)349 Status BasicBatchScheduler<TaskType>::Schedule(
350     std::unique_ptr<TaskType>* task) {
351   return shared_scheduler_queue_->Schedule(task);
352 }
353 
354 template <typename TaskType>
NumEnqueuedTasks()355 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
356   return shared_scheduler_queue_->NumEnqueuedTasks();
357 }
358 
359 template <typename TaskType>
SchedulingCapacity()360 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
361   return shared_scheduler_queue_->SchedulingCapacity();
362 }
363 
364 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)365 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
366     std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
367     : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
368 
369 }  // namespace serving
370 }  // namespace tensorflow
371 
372 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
373