1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
18
19 #include <stddef.h>
20
21 #include <cstddef>
22 #include <functional>
23 #include <memory>
24 #include <string>
25
26 #include "tensorflow/core/kernels/batching_util/shared_batch_scheduler.h"
27
28 namespace tensorflow {
29 namespace serving {
30
31 // A BatchScheduler implementation geared toward handling a single request type
32 // running on a specific set of hardware resources. A typical scenario is one in
33 // which all requests invoke the same machine-learned model on one GPU.
34 //
35 // If there are, say, two GPUs and two models each bound to one of the GPUs, one
36 // could use two BasicBatchScheduler instances to schedule the two model/GPU
37 // combinations independently. If multiple models must share a given GPU or
38 // other hardware resource, consider using SharedBatchScheduler instead.
39 //
40 //
41 // PARAMETERS AND BEHAVIOR:
42 //
43 // BasicBatchScheduler runs a fixed pool of threads, which it uses to process
44 // batches of tasks. It enforces a maximum batch size, and enqueues a bounded
45 // number of tasks. If the queue is nearly empty, such that a full batch cannot
46 // be formed, when a thread becomes free, it anyway schedules a batch
47 // immediately if a task has been in the queue for longer than a given timeout
48 // parameter. If the timeout parameter is set to 0, then the batch threads will
49 // always be kept busy (unless there are zero tasks waiting to be processed).
50 //
51 // For online serving, it is recommended to set the maximum number of enqueued
52 // batches worth of tasks equal to the number of batch threads, which allows
53 // enqueuing of enough tasks s.t. if every thread becomes available it can be
54 // kept busy, but no more. For bulk processing jobs and throughput-oriented
55 // benchmarks, you may want to set it much higher.
56 //
57 // When Schedule() is called, if the queue is full the call will fail with an
58 // UNAVAILABLE error (after which the client may retry again later). If the call
59 // succeeds, the maximum time the task will spend in the queue before being
60 // placed in a batch and assigned to a thread for processing, is the greater of:
61 // - the maximum time to process ceil(max_enqueued_batches/num_batch_threads)
62 // (1 in the recommended configuration) batches of previously-submitted tasks
63 // - the configured timeout parameter (which can be 0, as mentioned above)
64 //
65 // Unlike StreamingBatchScheduler, when BasicBatchScheduler assigns a batch to a
66 // thread, it closes the batch. The process-batch callback may assume that every
67 // batch it receives is closed at the outset.
68 //
69 //
70 // RECOMMENDED USE-CASES:
71 //
72 // BasicBatchScheduler is suitable for use-cases that feature a single kind of
73 // request (e.g. a server performing inference with a single machine-learned
74 // model, possibly evolving over time), with loose versioning semantics.
75 // Concretely, the following conditions should hold:
76 //
77 // A. All requests batched onto a given resource (e.g. a hardware accelerator,
78 // or a pool accelerators) are of the same type. For example, they all
79 // invoke the same machine-learned model.
80 //
81 // These variations are permitted:
82 // - The model may reside in a single servable, or it may be spread across
83 // multiple servables that are used in unison (e.g. a vocabulary lookup
84 // table servable and a tensorflow session servable).
85 // - The model's servable(s) may be static, or they may evolve over time
86 // (successive servable versions).
87 // - Zero or more of the servables are used in the request thread; the rest
88 // are used in the batch thread. In our running example, the vocabulary
89 // lookups and tensorflow runs may both be performed in the batch thread,
90 // or alternatively the vocabulary lookup may occur in the request thread
91 // with only the tensorflow run performed in the batch thread.
92 //
93 // In contrast, BasicBatchScheduler is not a good fit if the server
94 // hosts multiple distinct models running on a pool accelerators, with each
95 // request specifying which model it wants to use. BasicBatchScheduler
96 // has no facility to time-multiplex the batch threads across multiple
97 // models in a principled way. More basically, it cannot ensure that a given
98 // batch doesn't contain a mixture of requests for different models.
99 //
100 // B. Requests do not specify a particular version of the servable(s) that must
101 // be used. Instead, each request is content to use the "latest" version.
102 //
103 // BasicBatchScheduler does not constrain which requests get grouped
104 // together into a batch, so using this scheduler there is no way to achieve
105 // cohesion of versioned requests to version-specific batches.
106 //
107 // C. No servable version coordination needs to be performed between the
108 // request threads and the batch threads. Often, servables are only used in
109 // the batch threads, in which case this condition trivially holds. If
110 // servables are used in both threads, then the use-case must tolerate
111 // version skew across the servables used in the two kinds of threads.
112 //
113 //
114 // EXAMPLE USE-CASE FLOW:
115 //
116 // For such use-cases, request processing via BasicBatchScheduler generally
117 // follows this flow (given for illustration; variations are possible):
118 // 1. Optionally perform some pre-processing on each request in the request
119 // threads.
120 // 2. Route the requests to the batch scheduler, as batching::Task objects.
121 // (Since all requests are of the same type and are not versioned, the
122 // scheduler is free to group them into batches arbitrarily.)
123 // 3. Merge the requests into a single batched representation B.
124 // 4. Obtain handles to the servable(s) needed to process B. The simplest
125 // approach is to obtain the latest version of each servable. Alternatively,
126 // if cross-servable consistency is required (e.g. the vocabulary lookup
127 // table's version number must match that of the tensorflow session),
128 // identify an appropriate version number and obtain the servable handles
129 // accordingly.
130 // 5. Process B using the obtained servable handles, and split the result into
131 // individual per-request units.
132 // 6. Perform any post-processing in the batch thread and/or request thread.
133 //
134 //
135 // PERFORMANCE TUNING: See README.md.
136 //
137 template <typename TaskType>
138 class BasicBatchScheduler : public BatchScheduler<TaskType> {
139 public:
140 // TODO(b/25089730): Tune defaults based on best practices as they develop.
141 // (Keep them mirrored to the ones in SharedBatchScheduler::QueueOptions and
142 // SharedBatchScheduler::Options.)
143 struct Options {
144 // Options related with (underlying) shared batch scheduler.
145 // 'thread_pool_name' and 'num_batch_threads' are used to initialize
146 // a shared batch scheduler underlyingly iff 'shared_batch_scheduler' is
147 // nullptr.
148 //
149 // There are two ways to specify threading:
150 // 1) Have each session create its own pool.
151 // 2) Have multiple sessions share the same pool.
152 //
153 // In general, the number of threads should be tied to roughly the number of
154 // compute resources (CPU cores or accelerator cores) backing the threads.
155 // Sharing a thread pool helps alleviate potential over allocation of
156 // threads to limited compute resources.
157
158 // To have each session create its own thread pool (1) set
159 // thread_pool_name/num_batch_threads.
160
161 // To share a thread pool (2) create a scheduler and pass it in.
162
163 // The name to use for the pool of batch threads.
164 string thread_pool_name = {"batch_threads"};
165
166 // The number of threads to use to process batches.
167 // Must be >= 1, and should be tuned carefully.
168 int num_batch_threads = port::MaxParallelism();
169
170 // If specified, this scheduler will be used underlyingly to schedule
171 // batches. Note setting this means `thread_pool_name` and
172 // `num_batch_threads` are ignored.
173 std::shared_ptr<SharedBatchScheduler<TaskType>> shared_batch_scheduler =
174 nullptr;
175
176 // Options for queue.
177 // The maximum size of each batch.
178 //
179 // The scheduler may form batches of any size between 1 and this number
180 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
181 // submit batches whose size is in a small set of allowed sizes, that can be
182 // done by adding padding in the process-batch callback.
183 int max_batch_size = 1000;
184
185 // If a task has been enqueued for this amount of time (in microseconds),
186 // and a thread is available, the scheduler will immediately form a batch
187 // from enqueued tasks and assign the batch to the thread for processing,
188 // even if the batch's size is below 'max_batch_size'.
189 //
190 // This parameter offers a way to bound queue latency, so that a task isn't
191 // stuck in the queue indefinitely waiting for enough tasks to arrive to
192 // make a full batch. (The latency bound is given in the class documentation
193 // above.)
194 //
195 // The goal is to smooth out batch sizes under low request rates, and thus
196 // avoid latency spikes.
197 int64 batch_timeout_micros = 0;
198
199
200 // The maximum allowable number of enqueued (accepted by Schedule() but
201 // not yet being processed on a batch thread) tasks in terms of batches.
202 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
203 // See the class documentation above for guidelines on how to tune this
204 // parameter.
205 int max_enqueued_batches = 10;
206
207 // If true, an input task (i.e., input of `BasicBatchScheduler::Schedule`)
208 // with a large size (i.e., larger than the largest value of
209 // `allowed_batch_sizes`) will be split into multiple smaller batch tasks
210 // and possibly put into different batches for processing. If false, each
211 // input task is put into one batch as a whole for processing.
212 //
213 // API note:
214 // The value of this option doesn't affect processing output given the same
215 // input; it affects implementation details as stated below:
216 // 1. Improve batching efficiency by eliminating unnecessary padding in the
217 // following scenario: when an open batch has M slots while an input of size
218 // N is scheduled (M < N), the input can be split to fill remaining slots
219 // of an open batch as opposed to padding.
220 // 2.`max_batch_size` specifies the limit of input and
221 // `max_execution_batch_size` specifies the limit of a task to be processed.
222 // API user can give an input of size 128 when 'max_execution_batch_size'
223 // is 32 -> implementation can split input of 128 into 4 x 32, schedule
224 // concurrent processing, and then return concatenated results corresponding
225 // to 128.
226 bool enable_large_batch_splitting = false;
227
228 // `split_input_task_func` specifies how to split `input_task` into
229 // `output_tasks`.
230 //
231 // `input_task`: a unit of task to be split.
232 // `first_output_task_size`: task size of first output.
233 // `max_batch_size`: Maximum size of each batch.
234 // `output_tasks`: A list of output tasks after split.
235 //
236 // REQUIRED:
237 // 1) All `output_tasks` should be non-empty tasks.
238 // 2) Sizes of `output_tasks` add up to size of `input_task`.
239 //
240 // NOTE:
241 // Instantiations of `TaskType` may vary, so it's up to caller to define
242 // how (e.g., which members to access) to split input tasks.
243 std::function<Status(std::unique_ptr<TaskType>* input_task,
244 int first_output_task_size, int input_batch_size_limit,
245 std::vector<std::unique_ptr<TaskType>>* output_tasks)>
246 split_input_task_func;
247
248 // The maximum size of each enqueued batch (i.e., in `batches_`).
249 //
250 // The scheduler may form batches of any size between 1 and this number
251 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
252 // submit batches whose size is in a small set of allowed sizes, that can be
253 // done by adding padding in the process-batch callback.
254 //
255 // REQUIRES:
256 // - If enable_large_batch_splitting is true, `max_execution_batch_size` is
257 // less than or equal to `max_batch_size`.
258 // - If enable_large_batch_splitting is false, `max_execution_batch_size` is
259 // equal to `max_batch_size`.
260 int max_execution_batch_size = 10;
261
262 // The following options are typically only overridden by test code.
263
264 // The environment to use.
265 Env* env = Env::Default();
266 };
267 static Status Create(const Options& options,
268 std::function<void(std::unique_ptr<Batch<TaskType>>)>
269 process_batch_callback,
270 std::unique_ptr<BasicBatchScheduler>* scheduler);
271
272 ~BasicBatchScheduler() override = default;
273
274 Status Schedule(std::unique_ptr<TaskType>* task) override;
275 size_t NumEnqueuedTasks() const override;
276 size_t SchedulingCapacity() const override;
277
max_task_size()278 size_t max_task_size() const override {
279 return shared_scheduler_queue_->max_task_size();
280 }
281
282 private:
283 explicit BasicBatchScheduler(
284 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue);
285
286 // This class is merely a thin wrapper around a SharedBatchScheduler with a
287 // single queue.
288 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue_;
289
290 TF_DISALLOW_COPY_AND_ASSIGN(BasicBatchScheduler);
291 };
292
293 //////////
294 // Implementation details follow. API users need not read.
295
296 template <typename TaskType>
Create(const Options & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BasicBatchScheduler> * scheduler)297 Status BasicBatchScheduler<TaskType>::Create(
298 const Options& options,
299 std::function<void(std::unique_ptr<Batch<TaskType>>)>
300 process_batch_callback,
301 std::unique_ptr<BasicBatchScheduler>* scheduler) {
302 std::shared_ptr<SharedBatchScheduler<TaskType>> shared_scheduler;
303
304 if (options.shared_batch_scheduler == nullptr) {
305 typename SharedBatchScheduler<TaskType>::Options shared_scheduler_options;
306 shared_scheduler_options.thread_pool_name = options.thread_pool_name;
307 shared_scheduler_options.num_batch_threads = options.num_batch_threads;
308 shared_scheduler_options.env = options.env;
309
310 TF_RETURN_IF_ERROR(SharedBatchScheduler<TaskType>::Create(
311 shared_scheduler_options, &shared_scheduler));
312 } else {
313 shared_scheduler = options.shared_batch_scheduler;
314 }
315
316 typename SharedBatchScheduler<TaskType>::QueueOptions
317 shared_scheduler_queue_options;
318 shared_scheduler_queue_options.input_batch_size_limit =
319 options.max_batch_size;
320 shared_scheduler_queue_options.batch_timeout_micros =
321 options.batch_timeout_micros;
322 shared_scheduler_queue_options.max_enqueued_batches =
323 options.max_enqueued_batches;
324 shared_scheduler_queue_options.enable_large_batch_splitting =
325 options.enable_large_batch_splitting;
326 shared_scheduler_queue_options.split_input_task_func =
327 options.split_input_task_func;
328 shared_scheduler_queue_options.max_execution_batch_size =
329 options.max_execution_batch_size;
330 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue;
331 TF_RETURN_IF_ERROR(shared_scheduler->AddQueue(shared_scheduler_queue_options,
332 process_batch_callback,
333 &shared_scheduler_queue));
334
335 scheduler->reset(
336 new BasicBatchScheduler<TaskType>(std::move(shared_scheduler_queue)));
337 return Status::OK();
338 }
339
340 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)341 Status BasicBatchScheduler<TaskType>::Schedule(
342 std::unique_ptr<TaskType>* task) {
343 return shared_scheduler_queue_->Schedule(task);
344 }
345
346 template <typename TaskType>
NumEnqueuedTasks()347 size_t BasicBatchScheduler<TaskType>::NumEnqueuedTasks() const {
348 return shared_scheduler_queue_->NumEnqueuedTasks();
349 }
350
351 template <typename TaskType>
SchedulingCapacity()352 size_t BasicBatchScheduler<TaskType>::SchedulingCapacity() const {
353 return shared_scheduler_queue_->SchedulingCapacity();
354 }
355
356 template <typename TaskType>
BasicBatchScheduler(std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)357 BasicBatchScheduler<TaskType>::BasicBatchScheduler(
358 std::unique_ptr<BatchScheduler<TaskType>> shared_scheduler_queue)
359 : shared_scheduler_queue_(std::move(shared_scheduler_queue)) {}
360
361 } // namespace serving
362 } // namespace tensorflow
363
364 #endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_BASIC_BATCH_SCHEDULER_H_
365