• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 
21 #include <cstddef>
22 #include <functional>
23 #include <memory>
24 #include <string>
25 
26 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
27 
28 namespace tensorflow {
29 namespace serving {
30 
31 // A BatchScheduler implementation geared toward handling a single request type
32 // running on a specific set of hardware resources. A typical scenario is one in
33 // which all requests invoke the same machine-learned model on one GPU.
34 //
35 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
36 // could use two BasicBatchScheduler instances to schedule the two model/GPU
37 // combinations independently. If multiple models must share a given GPU or
38 // other hardware resource, consider using SharedBatchScheduler instead.
39 //
40 //
41 // PARAMETERS AND BEHAVIOR:
42 //
43 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
44 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
45 // number of tasks. If the queue is nearly empty, such that a full batch cannot
46 // be formed, when a thread becomes free, it anyway schedules a batch
47 // immediately if a task has been in the queue for longer than a given timeout
48 // parameter. If the timeout parameter is set to 0, then the batch threads will
49 // always be kept busy (unless there are zero tasks waiting to be processed).
50 //
51 // For online serving, it is recommended to set the maximum number of enqueued
52 // batches worth of tasks equal to the number of batch threads, which allows
53 // enqueuing of enough tasks s.t. if every thread becomes available it can be
54 // kept busy, but no more. For bulk processing jobs and throughput-oriented
55 // benchmarks, you may want to set it much higher.
56 //
57 // When Schedule() is called, if the queue is full the call will fail with an
58 // UNAVAILABLE error (after which the client may retry again later). If the call
59 // succeeds, the maximum time the task will spend in the queue before being
60 // placed in a batch and assigned to a thread for processing, is the greater of:
61 //  - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
62 //    (1 in the recommended configuration) batches of previously-submitted tasks
63 //  - the configured timeout parameter (which can be 0, as mentioned above)
64 //
65 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
66 // thread, it closes the batch. The process-batch callback may assume that every
67 // batch it receives is closed at the outset.
68 //
69 //
70 // RECOMMENDED USE-CASES:
71 //
72 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
73 // request (e.g. a server performing inference with a single machine-learned
74 // model, possibly evolving over time), with loose versioning semantics.
75 // Concretely, the following conditions should hold:
76 //
77 //  A. All requests batched onto a given resource (e.g. a hardware accelerator,
78 //     or a pool accelerators) are of the same type. For example, they all
79 //     invoke the same machine-learned model.
80 //
81 //     These variations are permitted:
82 //      - The model may reside in a single servable, or it may be spread across
83 //        multiple servables that are used in unison (e.g. a vocabulary lookup
84 //        table servable and a tensorflow session servable).
85 //      - The model's servable(s) may be static, or they may evolve over time
86 //        (successive servable versions).
87 //      - Zero or more of the servables are used in the request thread; the rest
88 //        are used in the batch thread. In our running example, the vocabulary
89 //        lookups and tensorflow runs may both be performed in the batch thread,
90 //        or alternatively the vocabulary lookup may occur in the request thread
91 //        with only the tensorflow run performed in the batch thread.
92 //
93 //     In contrast, BasicBatchScheduler is not a good fit if the server
94 //     hosts multiple distinct models running on a pool accelerators, with each
95 //     request specifying which model it wants to use. BasicBatchScheduler
96 //     has no facility to time-multiplex the batch threads across multiple
97 //     models in a principled way. More basically, it cannot ensure that a given
98 //     batch doesn't contain a mixture of requests for different models.
99 //
100 //  B. Requests do not specify a particular version of the servable(s) that must
101 //     be used. Instead, each request is content to use the "latest" version.
102 //
103 //     BasicBatchScheduler does not constrain which requests get grouped
104 //     together into a batch, so using this scheduler there is no way to achieve
105 //     cohesion of versioned requests to version-specific batches.
106 //
107 //  C. No servable version coordination needs to be performed between the
108 //     request threads and the batch threads. Often, servables are only used in
109 //     the batch threads, in which case this condition trivially holds. If
110 //     servables are used in both threads, then the use-case must tolerate
111 //     version skew across the servables used in the two kinds of threads.
112 //
113 //
114 // EXAMPLE USE-CASE FLOW:
115 //
116 // For such use-cases, request processing via BasicBatchScheduler generally
117 // follows this flow (given for illustration; variations are possible):
118 //  1. Optionally perform some pre-processing on each request in the request
119 //     threads.
120 //  2. Route the requests to the batch scheduler, as batching::Task objects.
121 //     (Since all requests are of the same type and are not versioned, the
122 //     scheduler is free to group them into batches arbitrarily.)
123 //  3. Merge the requests into a single batched representation B.
124 //  4. Obtain handles to the servable(s) needed to process B. The simplest
125 //     approach is to obtain the latest version of each servable. Alternatively,
126 //     if cross-servable consistency is required (e.g. the vocabulary lookup
127 //     table's version number must match that of the tensorflow session),
128 //     identify an appropriate version number and obtain the servable handles
129 //     accordingly.
130 //  5. Process B using the obtained servable handles, and split the result into
131 //     individual per-request units.
132 //  6. Perform any post-processing in the batch thread and/or request thread.
133 //
134 //
135 // PERFORMANCE TUNING: See README.md.
136 //
137 template <typename TaskType>
138 class BasicBatchScheduler : public BatchScheduler<TaskType> {
139  public:
140   // TODO(b/25089730): Tune defaults based on best practices as they develop.
141   // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
142   // SharedBatchScheduler::Options.)
143   struct Options {
144     // Options related with (underlying) shared batch scheduler.
145     // 'thread_pool_name' and 'num_batch_threads' are used to initialize
146     // a shared batch scheduler underlyingly iff 'shared_batch_scheduler' is
147     // nullptr.
148     //
149     // There are two ways to specify threading:
150     // 1) Have each session create its own pool.
151     // 2) Have multiple sessions share the same pool.
152     //
153     // In general, the number of threads should be tied to roughly the number of
154     // compute resources (CPU cores or accelerator cores) backing the threads.
155     // Sharing a thread pool helps alleviate potential over allocation of
156     // threads to limited compute resources.
157 
158     // To have each session create its own thread pool (1) set
159     // thread_pool_name/num_batch_threads.
160 
161     // To share a thread pool (2) create a scheduler and pass it in.
162 
163     // The name to use for the pool of batch threads.
164     string thread_pool_name = {"batch_threads"};
165 
166     // The number of threads to use to process batches.
167     // Must be >= 1, and should be tuned carefully.
168     int num_batch_threads = port::MaxParallelism();
169 
170     // If specified, this scheduler will be used underlyingly to schedule
171     // batches. Note setting this means `thread_pool_name` and
172     // `num_batch_threads` are ignored.
173     std::shared_ptr<SharedBatchScheduler<TaskType>> shared_batch_scheduler =
174         nullptr;
175 
176     // Options for queue.
177     // The maximum size of each batch.
178     //
179     // The scheduler may form batches of any size between 1 and this number
180     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
181     // submit batches whose size is in a small set of allowed sizes, that can be
182     // done by adding padding in the process-batch callback.
183     int max_batch_size = 1000;
184 
185     // If a task has been enqueued for this amount of time (in microseconds),
186     // and a thread is available, the scheduler will immediately form a batch
187     // from enqueued tasks and assign the batch to the thread for processing,
188     // even if the batch's size is below 'max_batch_size'.
189     //
190     // This parameter offers a way to bound queue latency, so that a task isn't
191     // stuck in the queue indefinitely waiting for enough tasks to arrive to
192     // make a full batch. (The latency bound is given in the class documentation
193     // above.)
194     //
195     // The goal is to smooth out batch sizes under low request rates, and thus
196     // avoid latency spikes.
197     int64 batch_timeout_micros = 0;
198 
199 
200     // The maximum allowable number of enqueued (accepted by Schedule() but
201     // not yet being processed on a batch thread) tasks in terms of batches.
202     // If this limit is reached, Schedule() will return an UNAVAILABLE error.
203     // See the class documentation above for guidelines on how to tune this
204     // parameter.
205     int max_enqueued_batches = 10;
206 
207     // If true, an input task (i.e., input of `BasicBatchScheduler::Schedule`)
208     // with a large size (i.e., larger than the largest value of
209     // `allowed_batch_sizes`) will be split into multiple smaller batch tasks
210     // and possibly put into different batches for processing. If false, each
211     // input task is put into one batch as a whole for processing.
212     //
213     // API note:
214     // The value of this option doesn't affect processing output given the same
215     // input; it affects implementation details as stated below:
216     // 1. Improve batching efficiency by eliminating unnecessary padding in the
217     // following scenario: when an open batch has M slots while an input of size
218     // N is scheduled (M < N), the input can be split to fill remaining slots
219     // of an open batch as opposed to padding.
220     // 2.`max_batch_size` specifies the limit of input and
221     // `max_execution_batch_size` specifies the limit of a task to be processed.
222     // API user can give an input of size 128 when 'max_execution_batch_size'
223     // is 32 -> implementation can split input of 128 into 4 x 32, schedule
224     // concurrent processing, and then return concatenated results corresponding
225     // to 128.
226     bool enable_large_batch_splitting = false;
227 
228     // `split_input_task_func` specifies how to split `input_task` into
229     // `output_tasks`.
230     //
231     // `input_task`: a unit of task to be split.
232     // `first_output_task_size`: task size of first output.
233     // `max_batch_size`: Maximum size of each batch.
234     // `output_tasks`: A list of output tasks after split.
235     //
236     // REQUIRED:
237     // 1) All `output_tasks` should be non-empty tasks.
238     // 2) Sizes of `output_tasks` add up to size of `input_task`.
239     //
240     // NOTE:
241     // Instantiations of `TaskType` may vary, so it's up to caller to define
242     // how (e.g., which members to access) to split input tasks.
243     std::function<Status(std::unique_ptr<TaskType>* input_task,
244                          int first_output_task_size, int input_batch_size_limit,
245                          std::vector<std::unique_ptr<TaskType>>* output_tasks)>
246         split_input_task_func;
247 
248     // The maximum size of each enqueued batch (i.e., in `batches_`).
249     //
250     // The scheduler may form batches of any size between 1 and this number
251     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
252     // submit batches whose size is in a small set of allowed sizes, that can be
253     // done by adding padding in the process-batch callback.
254     //
255     // REQUIRES:
256     // - If enable_large_batch_splitting is true, `max_execution_batch_size` is
257     // less than or equal to `max_batch_size`.
258     // - If enable_large_batch_splitting is false, `max_execution_batch_size` is
259     // equal to `max_batch_size`.
260     int max_execution_batch_size = 10;
261 
262     // The following options are typically only overridden by test code.
263 
264     // The environment to use.
265     Env* env = Env::Default();
266   };
267   static Status Create(const Options& options,
268                        std::function<void(std::unique_ptr<Batch<TaskType>>)>
269                            process_batch_callback,
270                        std::unique_ptr<BasicBatchScheduler>* scheduler);
271 
272   ~BasicBatchScheduler() override = default;
273 
274   Status Schedule(std::unique_ptr<TaskType>* task) override;
275   size_t NumEnqueuedTasks() const override;
276   size_t SchedulingCapacity() const override;
277 
max_task_size()278   size_t max_task_size() const override {
279     return shared_scheduler_queue_->max_task_size();
280   }
281 
282  private:
283   explicit BasicBatchScheduler(
284       std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
285 
286   // This class is merely a thin wrapper around a SharedBatchScheduler with a
287   // single queue.
288   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
289 
290   TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
291 };
292 
293 //////////
294 // Implementation details follow. API users need not read.
295 
296 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)297 Status BasicBatchScheduler<TaskType>::Create(
298     const Options& options,
299     std::function<void(std::unique_ptr<Batch<TaskType>>)>
300         process_batch_callback,
301     std::unique_ptr<BasicBatchScheduler>* scheduler) {
302   std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
303 
304   if (options.shared_batch_scheduler == nullptr) {
305     typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
306     shared_scheduler_options.thread_pool_name = options.thread_pool_name;
307     shared_scheduler_options.num_batch_threads = options.num_batch_threads;
308     shared_scheduler_options.env = options.env;
309 
310     TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
311         shared_scheduler_options, &shared_scheduler));
312   } else {
313     shared_scheduler = options.shared_batch_scheduler;
314   }
315 
316   typename SharedBatchScheduler<TaskType>::QueueOptions
317       shared_scheduler_queue_options;
318   shared_scheduler_queue_options.input_batch_size_limit =
319       options.max_batch_size;
320   shared_scheduler_queue_options.batch_timeout_micros =
321       options.batch_timeout_micros;
322   shared_scheduler_queue_options.max_enqueued_batches =
323       options.max_enqueued_batches;
324   shared_scheduler_queue_options.enable_large_batch_splitting =
325       options.enable_large_batch_splitting;
326   shared_scheduler_queue_options.split_input_task_func =
327       options.split_input_task_func;
328   shared_scheduler_queue_options.max_execution_batch_size =
329       options.max_execution_batch_size;
330   std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
331   TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
332                                                 process_batch_callback,
333                                                 &shared_scheduler_queue));
334 
335   scheduler->reset(
336       new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
337   return Status::OK();
338 }
339 
340 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)341 Status BasicBatchScheduler<TaskType>::Schedule(
342     std::unique_ptr<TaskType>* task) {
343   return shared_scheduler_queue_->Schedule(task);
344 }
345 
346 template <typename TaskType>
NumEnqueuedTasks()347 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
348   return shared_scheduler_queue_->NumEnqueuedTasks();
349 }
350 
351 template <typename TaskType>
SchedulingCapacity()352 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
353   return shared_scheduler_queue_->SchedulingCapacity();
354 }
355 
356 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)357 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
358     std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
359     : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
360 
361 }  // namespace serving
362 }  // namespace tensorflow
363 
364 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
365