1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
18
19 #include <stddef.h>
20
21 #include <deque>
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/time/clock.h"
30 #include "tensorflow/core/kernels/batching_util/batch_input_task.h"
31 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
32 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
33 #include "tensorflow/core/lib/core/errors.h"
34 #include "tensorflow/core/lib/core/status.h"
35 #include "tensorflow/core/lib/strings/strcat.h"
36 #include "tensorflow/core/platform/byte_order.h"
37 #include "tensorflow/core/platform/cpu_info.h"
38 #include "tensorflow/core/platform/env.h"
39 #include "tensorflow/core/platform/errors.h"
40 #include "tensorflow/core/platform/thread_annotations.h"
41 #include "tensorflow/core/platform/types.h"
42 #include "tensorflow/core/profiler/lib/connected_traceme.h"
43 #include "tensorflow/core/profiler/lib/traceme.h"
44 #include "tensorflow/core/profiler/lib/traceme_encode.h"
45
46 namespace tensorflow {
47 namespace serving {
48 namespace internal {
49 template <typename TaskType>
50 class Queue;
51 } // namespace internal
52 } // namespace serving
53 } // namespace tensorflow
54
55 namespace tensorflow {
56 namespace serving {
57
58 // A batch scheduler for server instances that service multiple request types
59 // (e.g. multiple machine-learned models, or multiple versions of a model served
60 // concurrently), or even multiple distinct tasks for a given request. The
61 // scheduler multiplexes batches of different kinds of tasks onto a fixed-size
62 // thread pool (each batch contains tasks of a single type), in a carefully
63 // controlled manner. A common configuration is to set the number of threads
64 // equal to the number of hardware accelerator units, in which case the
65 // scheduler takes care of multiplexing the task types onto the shared hardware,
66 // in a manner that is both fair and efficient.
67 //
68 // Semantically, SharedBatchScheduler behaves like having N instances of
69 // BasicBatchScheduler (see basic_batch_scheduler.h), one per task type. The
70 // difference is that under the covers there is a single shared thread pool,
71 // instead of N independent ones, with their sharing deliberately coordinated.
72 //
73 // SharedBatchScheduler does not implement the BatchScheduler API; rather, it
74 // presents an abstraction of "queues", where each queue corresponds to one type
75 // of task. Tasks submitted to a given queue are placed in their own batches,
76 // and cannot be mixed with other tasks. Queues can be added and deleted
77 // dynamically, to accommodate e.g. versions of a model being brought up and
78 // down over the lifetime of a server.
79 //
80 // The batch thread pool round-robins through the queues, running one batch
81 // from a queue and then moving to the next queue. Each queue behaves like a
82 // BasicBatchScheduler instance, in the sense that it has maximum batch size and
83 // timeout parameters, which govern when a batch is eligible to be processed.
84 //
85 // Each queue is independently configured with a maximum size (in terms of the
86 // maximum number of batches worth of enqueued tasks). For online serving, it is
87 // recommended that the queue sizes be configured such that the sum of the sizes
88 // of the active queues roughly equal the number of batch threads. (The idea is
89 // that if all threads become available at roughly the same time, there will be
90 // enough enqueued work for them to take on, but no more.)
91 //
92 // If queue sizes are configured in the manner suggested above, the maximum time
93 // a task can spend in a queue before being placed in a batch and assigned to a
94 // thread for processing, is the greater of:
95 // - the maximum time to process one batch of tasks from any active queue
96 // - the configured timeout parameter for the task's queue (which can be 0)
97 //
98 // For bulk processing jobs and throughput-oriented benchmarks, you may want to
99 // set the maximum queue size to a large value.
100 //
101 // TODO(b/26539183): Support queue servicing policies other than round-robin.
102 // E.g. let each queue specify a "share" (an int >= 1), so e.g. with queues A
103 // and B having shares 1 and 2 respectively, the servicing pattern is ABBABB...
104 //
105 //
106 // PERFORMANCE TUNING: See README.md.
107 //
108 template <typename TaskType>
109 class SharedBatchScheduler
110 : public std::enable_shared_from_this<SharedBatchScheduler<TaskType>> {
111 public:
112 // TODO(b/25089730): Tune defaults based on best practices as they develop.
113 struct Options {
114 // The name to use for the pool of batch threads.
115 string thread_pool_name = {"batch_threads"};
116
117 // The number of threads to use to process batches.
118 // Must be >= 1, and should be tuned carefully.
119 int num_batch_threads = port::MaxParallelism();
120
121 // The environment to use.
122 // (Typically only overridden by test code.)
123 Env* env = Env::Default();
124 };
125 // Ownership is shared between the caller of Create() and any queues created
126 // via AddQueue().
127 static Status Create(
128 const Options& options,
129 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler);
130
131 ~SharedBatchScheduler();
132
133 // Adds a queue to which tasks may be submitted. The returned queue implements
134 // the BatchScheduler API. Each queue has its own set of scheduling options,
135 // and its own callback to process batches of tasks submitted to the queue.
136 //
137 // The returned queue's destructor blocks until all tasks submitted to it have
138 // been processed.
139 struct QueueOptions {
140 // The size limit of an input batch to the queue.
141 //
142 // If `enable_large_batch_splitting` is True, 'input_batch_size_limit'
143 // should be greater or equal than `max_execution_batch_size`; otherwise
144 // `input_batch_size_limit` should be equal to `max_execution_batch_size`.
145 size_t input_batch_size_limit = 1000;
146
147 // If a task has been enqueued for this amount of time (in microseconds),
148 // and a thread is available, the scheduler will immediately form a batch
149 // from enqueued tasks and assign the batch to the thread for processing,
150 // even if the batch's size is below 'input_batch_size_limit'.
151 //
152 // This parameter offers a way to bound queue latency, so that a task isn't
153 // stuck in the queue indefinitely waiting for enough tasks to arrive to
154 // make a full batch. (The latency bound is given in the class documentation
155 // above.)
156 //
157 // The goal is to smooth out batch sizes under low request rates, and thus
158 // avoid latency spikes.
159 int64 batch_timeout_micros = 0;
160
161 // The maximum allowable number of enqueued (accepted by Schedule() but
162 // not yet being processed on a batch thread) tasks in terms of batches.
163 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
164 // See the class documentation above for guidelines on how to tune this
165 // parameter.
166 //
167 // Must be positive, or else invalid argument error will be returned at
168 // queue creation time.
169 size_t max_enqueued_batches = 10;
170
171 // If true, queue implementation would split one input batch task into
172 // subtasks (as specified by `split_input_task_func` below) and fit subtasks
173 // into different batches.
174 //
175 // For usage of `split_input_task_func`, please see its comment.
176 bool enable_large_batch_splitting = false;
177
178 // `input_task`: a unit of task to be split.
179 // `first_output_task_size`: task size of first output.
180 // `max_execution_batch_size`: Maximum size of each batch.
181 // `output_tasks`: A list of output tasks after split.
182 //
183 // REQUIRED:
184 // 1) All `output_tasks` should be non-empty tasks.
185 // 2) Sizes of `output_tasks` add up to size of `input_task`.
186 //
187 // NOTE:
188 // Instantiations of `TaskType` may vary, so it's up to caller to define
189 // how (e.g., which members to access) to split input tasks.
190 std::function<Status(std::unique_ptr<TaskType>* input_task,
191 int first_output_task_size, int input_batch_size_limit,
192 std::vector<std::unique_ptr<TaskType>>* output_tasks)>
193 split_input_task_func;
194
195 // If true, batch input tasks are split lazily after dequeue and not on the
196 // critical path of enqueue operations.
197 //
198 // Must be false if `enable_large_batch_splitting` is false; elsewise errors
199 // will be returned at queue creation time.
200 //
201 // TODO(b/194294263):
202 // Make `enable_lazy_split` a template parameter of queue, and adapts
203 // `batches_` and `task_handle_batches_` into one deque of
204 // tensorflow::serving::Batch.
205 bool enable_lazy_split = false;
206
207 // The maximum size of each enqueued batch (i.e., in `batches_`).
208 //
209 // The scheduler may form batches of any size between 1 and this number
210 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
211 // submit batches whose size is in a small set of allowed sizes, that can be
212 // done by adding padding in the process-batch callback.
213 size_t max_execution_batch_size = 1000;
214 };
215 Status AddQueue(const QueueOptions& options,
216 std::function<void(std::unique_ptr<Batch<TaskType>>)>
217 process_batch_callback,
218 std::unique_ptr<BatchScheduler<TaskType>>* queue);
219
220 private:
221 explicit SharedBatchScheduler(const Options& options);
222
223 void GetNextWorkItem_Locked(
224 internal::Queue<TaskType>** queue_for_batch_out,
225 std::unique_ptr<Batch<TaskType>>* batch_to_process_out)
226 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
227
228 // The code executed in 'batch_threads_'. Obtains a batch to process from the
229 // queue pointed to by 'next_queue_to_schedule_', and processes it. If that
230 // queue declines to provide a batch to process, moves onto the next queue. If
231 // no queues provide a batch to process, just sleeps briefly and exits.
232 void ThreadLogic();
233
234 const Options options_;
235
236 mutex mu_;
237
238 // A list of queues. (We use std::list instead of std::vector to ensure that
239 // iterators are not invalidated by adding/removing elements. It also offers
240 // efficient removal of elements from the middle.)
241 using QueueList = std::list<std::unique_ptr<internal::Queue<TaskType>>>;
242
243 // All "active" queues, i.e. ones that either:
244 // - have not been removed, or
245 // - have been removed but are not yet empty.
246 QueueList queues_ TF_GUARDED_BY(mu_);
247
248 // An iterator over 'queues_', pointing to the queue from which the next
249 // available batch thread should grab work.
250 typename QueueList::iterator next_queue_to_schedule_ TF_GUARDED_BY(mu_);
251
252 // Used by idle batch threads to wait for work to enter the system. Notified
253 // whenever a batch becomes schedulable.
254 condition_variable schedulable_batch_cv_;
255
256 // Threads that process batches obtained from the queues.
257 std::vector<std::unique_ptr<PeriodicFunction>> batch_threads_;
258
259 TF_DISALLOW_COPY_AND_ASSIGN(SharedBatchScheduler);
260 };
261
262 //////////
263 // Implementation details follow. API users need not read.
264
265 namespace internal {
266
267 // A task queue for SharedBatchScheduler. Accepts tasks and accumulates them
268 // into batches, and dispenses those batches to be processed via a "pull"
269 // interface. The queue's behavior is governed by maximum batch size, timeout
270 // and maximum queue length parameters; see their documentation in
271 // SharedBatchScheduler.
272 //
273 // The queue is implemented as a deque of batches, with these invariants:
274 // - The number of batches is between 1 and 'options_.max_enqueued_batches'.
275 // - The back-most batch is open; the rest are closed.
276 //
277 // Submitted tasks are added to the open batch. If that batch doesn't have room
278 // but the queue isn't full, then that batch is closed and a new open batch is
279 // started.
280 //
281 // Batch pull requests are handled by dequeuing the front-most batch if it is
282 // closed. If the front-most batch is open (i.e. the queue contains only one
283 // batch) and has reached the timeout, it is immediately closed and returned;
284 // otherwise no batch is returned for the request.
285 template <typename TaskType>
286 class Queue {
287 public:
288 using ProcessBatchCallback =
289 std::function<void(std::unique_ptr<Batch<TaskType>>)>;
290 using SchedulableBatchCallback = std::function<void()>;
291 using SplitInputTaskIntoSubtasksCallback = std::function<Status(
292 std::unique_ptr<TaskType>* input_task, int open_batch_remaining_slot,
293 int max_execution_batch_size,
294 std::vector<std::unique_ptr<TaskType>>* output_tasks)>;
295 Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
296 Env* env, ProcessBatchCallback process_batch_callback,
297 SchedulableBatchCallback schedulable_batch_callback);
298
299 // Illegal to destruct unless the queue is empty.
300 ~Queue();
301
302 // Submits a task to the queue, with the same semantics as
303 // BatchScheduler::Schedule().
304 Status Schedule(std::unique_ptr<TaskType>* task);
305
306 // Enqueue `task` as it is OR split it inline (eagerly) to form batches to be
307 // processed by `Queue<TaskType>::ProcessBatch`
308 Status ScheduleWithoutOrEagerSplit(std::unique_ptr<TaskType>* task);
309
310 // Enqueue `task` along with the batch queue metadata.
311 // Batches are formed by the time `ScheduleWithLazySplit` returns; and each
312 // batch in the deque could evaluate to a batch to be processed after it's
313 // dequeued (out of mutex-protected area).
314 Status ScheduleWithLazySplit(std::unique_ptr<TaskType>* task);
315
316 // Returns the number of enqueued tasks, with the same semantics as
317 // BatchScheduler::NumEnqueuedTasks().
318 size_t NumEnqueuedTasks() const;
319
320 // Returns the queue capacity, with the same semantics as
321 // BatchScheduler::SchedulingCapacity().
322 size_t SchedulingCapacity() const;
323
324 // Returns the maximum allowed size of tasks submitted to the queue.
max_task_size()325 size_t max_task_size() const { return options_.input_batch_size_limit; }
326
327 // Returns the maximum allowed size of tasks to be executed.
328 // Returned value would be less than or equal to the maximum allowed input
329 // size that's provided by caller of batch scheduler.
max_execution_batch_size()330 size_t max_execution_batch_size() const { return max_execution_batch_size_; }
331
332 // Called by a thread that is ready to process a batch, to request one from
333 // this queue. Either returns a batch that is ready to be processed, or
334 // nullptr if the queue declines to schedule a batch at this time. If it
335 // returns a batch, the batch is guaranteed to be closed.
336 std::unique_ptr<Batch<TaskType>> ScheduleBatch();
337
338 // A variant of `ScheduleBatch`.
339 // Batches are guaranteed to form at task enqueue time.
340 std::unique_ptr<Batch<TaskType>> ScheduleBatchWithEagerSplit();
341
342 // Processes a batch that has been returned earlier by ScheduleBatch().
343 void ProcessBatch(std::unique_ptr<Batch<TaskType>> batch);
344
345 // Determines whether the queue is empty, i.e. has no tasks waiting or being
346 // processed.
347 bool IsEmpty() const;
348
349 // Marks the queue closed, and waits until it is empty.
350 void CloseAndWaitUntilEmpty();
351
closed()352 bool closed() const TF_NO_THREAD_SAFETY_ANALYSIS { return closed_.load(); }
353
354 private:
355 // Computes the max_execution_batch_size of the queue based on queue options.
GetMaxExecutionBatchSize(const typename SharedBatchScheduler<TaskType>::QueueOptions & options)356 static size_t GetMaxExecutionBatchSize(
357 const typename SharedBatchScheduler<TaskType>::QueueOptions& options) {
358 // If `enable_large_batch_splitting`, returns `max_execution_batch_size`
359 // configured by user options directly; returns `input_batch_size_limit`
360 // otherwise.
361 //
362 // Note `input_batch_size_limit` is used for backward compatibitliy ->
363 // users may not specify `max_execution_batch_size` explicitly.
364 if (options.enable_large_batch_splitting) {
365 return options.max_execution_batch_size;
366 } else {
367 return options.input_batch_size_limit;
368 }
369 }
370
371 // Same as IsEmpty(), but assumes the caller already holds a lock on 'mu_'.
372 bool IsEmptyInternal() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
373
374 // Closes the open batch residing at the back of std::deque, and inserts a
375 // fresh open batch behind it.
376 void StartNewBatch() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
377
378 // Split `input task` into `output_tasks` according to 'task_sizes'.
379 Status SplitInputBatchIntoSubtasks(
380 std::unique_ptr<TaskType>* input_task,
381 std::vector<std::unique_ptr<TaskType>>* output_tasks)
382 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
383
384 // Determines whether the open batch residing at the back of 'batches_' is
385 // currently schedulable.
386 bool IsOpenBatchSchedulable() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
387
388 // A variant of `IsOpenBatchSchedulable`; used when batches are formed at
389 // task enqueue time, and open batch is `batches_.back()`.
390 bool IsOpenBatchSchedulableAfterEagerSplit() const
391 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
392
393 // Same as SchedulingCapacity(), but assumes the caller already holds a
394 // lock on 'mu_'.
395 size_t SchedulingCapacityInternal() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
396
397 // Returns true if queue doesn't have capacity for this task.
398 //
399 // `task` must outlive this method.
400 bool BatchTaskExceedQueueCapacity(TaskType* task) const
401 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
402
403 // The task size of the last batch in the queue.
404 size_t tail_batch_task_size() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
405
406 // Returns the number of enqueued batches.
407 int64 num_enqueued_batches() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
408
409 const typename SharedBatchScheduler<TaskType>::QueueOptions options_;
410
411 // The environment to use.
412 Env* env_;
413
414 // The maximum batch size to be executed by `Queue::ProcessBatch`.
415 // See the comment of QueueOptions and helper function
416 // `GetMaxExecutionBatchSize` for more details on what it means.
417 const size_t max_execution_batch_size_;
418
419 // A callback invoked to processes a batch of work units. Always invoked
420 // from a batch thread.
421 ProcessBatchCallback process_batch_callback_;
422
423 // A callback invoked to notify the scheduler that a new batch has become
424 // schedulable.
425 SchedulableBatchCallback schedulable_batch_callback_;
426
427 mutable mutex mu_;
428
429 // Whether this queue can accept new tasks. This variable is monotonic: it
430 // starts as false, and then at some point gets set to true and remains true
431 // for the duration of this object's life.
TF_GUARDED_BY(mu_)432 std::atomic<bool> closed_ TF_GUARDED_BY(mu_){false};
433
434 // The enqueued batches.
435 // Each element corresponds to a task to be dequeued and processed by
436 // `Queue<TaskType>::ProcessBatch`.
437 //
438 // Used iff `QueueOptions.enable_lazy_split` is false.
439 std::deque<std::unique_ptr<Batch<TaskType>>> batches_ TF_GUARDED_BY(mu_);
440
441 // The enqueued batches.
442 //
443 // Each element corresponds to the `task` enqueued in `Queue::Schedule`; the
444 // element could be split and processed in batches at dequeue time.
445 //
446 // Used iff `QueueOptions.enable_lazy_split` is true.
447 std::deque<std::unique_ptr<Batch<BatchInputTaskHandle<TaskType>>>>
448 task_handle_batches_ TF_GUARDED_BY(mu_);
449
450 // The counter of the TraceMe context ids.
451 uint64 traceme_context_id_counter_ TF_GUARDED_BY(mu_) = 0;
452
453 // The time at which the first task was added to the open (back-most) batch
454 // in 'batches_'. Valid iff that batch contains at least one task.
455 uint64 open_batch_start_time_micros_ TF_GUARDED_BY(mu_);
456
457 // Whether this queue contains a batch that is eligible to be scheduled.
458 // Used to keep track of when to call 'schedulable_batch_callback_'.
459 bool schedulable_batch_ TF_GUARDED_BY(mu_) = false;
460
461 // The number of batches currently being processed by batch threads.
462 // Incremented in ScheduleBatch() and decremented in ProcessBatch().
463 int num_batches_being_processed_ TF_GUARDED_BY(mu_) = 0;
464
465 // Used by CloseAndWaitUntilEmpty() to wait until the queue is empty, for
466 // the case in which the queue is not empty when CloseAndWaitUntilEmpty()
467 // starts. When ProcessBatch() dequeues the last batch and makes the queue
468 // empty, if 'empty_notification_' is non-null it calls
469 // 'empty_notification_->Notify()'.
470 Notification* empty_notification_ TF_GUARDED_BY(mu_) = nullptr;
471
472 TF_DISALLOW_COPY_AND_ASSIGN(Queue);
473 };
474
475 // A RAII-style object that points to a Queue and implements
476 // the BatchScheduler API. To be handed out to clients who call AddQueue().
477 template <typename TaskType>
478 class QueueHandle : public BatchScheduler<TaskType> {
479 public:
480 QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
481 Queue<TaskType>* queue);
482 ~QueueHandle() override;
483
484 Status Schedule(std::unique_ptr<TaskType>* task) override;
485 size_t NumEnqueuedTasks() const override;
486 size_t SchedulingCapacity() const override;
487
max_task_size()488 size_t max_task_size() const override { return queue_->max_task_size(); }
489
490 private:
491 // The scheduler that owns 'queue_'.
492 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler_;
493
494 // The queue this handle wraps. Owned by 'scheduler_', which keeps it alive at
495 // least until this class's destructor closes it.
496 Queue<TaskType>* queue_;
497
498 TF_DISALLOW_COPY_AND_ASSIGN(QueueHandle);
499 };
500
501 } // namespace internal
502
503 template <typename TaskType>
Create(const Options & options,std::shared_ptr<SharedBatchScheduler<TaskType>> * scheduler)504 Status SharedBatchScheduler<TaskType>::Create(
505 const Options& options,
506 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler) {
507 if (options.num_batch_threads < 1) {
508 return errors::InvalidArgument("num_batch_threads must be positive; was ",
509 options.num_batch_threads);
510 }
511 scheduler->reset(new SharedBatchScheduler<TaskType>(options));
512 return Status::OK();
513 }
514
515 template <typename TaskType>
~SharedBatchScheduler()516 SharedBatchScheduler<TaskType>::~SharedBatchScheduler() {
517 // Wait until the batch threads finish clearing out and deleting the closed
518 // queues.
519 for (;;) {
520 {
521 mutex_lock l(mu_);
522 if (queues_.empty()) {
523 break;
524 }
525 }
526 const int64_t kSleepTimeMicros = 100;
527 options_.env->SleepForMicroseconds(kSleepTimeMicros);
528 }
529 // Delete the batch threads before allowing state the threads may access (e.g.
530 // 'mu_') to be deleted.
531 batch_threads_.clear();
532 }
533
534 template <typename TaskType>
AddQueue(const QueueOptions & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BatchScheduler<TaskType>> * queue)535 Status SharedBatchScheduler<TaskType>::AddQueue(
536 const QueueOptions& options,
537 std::function<void(std::unique_ptr<Batch<TaskType>>)>
538 process_batch_callback,
539 std::unique_ptr<BatchScheduler<TaskType>>* queue) {
540 if (options.input_batch_size_limit == 0) {
541 return errors::InvalidArgument(
542 "input_batch_size_limit must be positive; was ",
543 options.input_batch_size_limit);
544 }
545 if (options.batch_timeout_micros < 0) {
546 return errors::InvalidArgument(
547 "batch_timeout_micros must be non-negative; was ",
548 options.batch_timeout_micros);
549 }
550 if (options.max_enqueued_batches == 0) {
551 return errors::InvalidArgument(
552 "max_enqueued_batches must be positive; was ",
553 options.max_enqueued_batches);
554 }
555
556 if (options.enable_large_batch_splitting &&
557 options.split_input_task_func == nullptr) {
558 return errors::InvalidArgument(
559 "split_input_task_func must be specified when split_input_task is "
560 "true: ",
561 options.enable_large_batch_splitting);
562 }
563
564 if (options.enable_lazy_split && (!options.enable_large_batch_splitting)) {
565 return errors::InvalidArgument(
566 "enable_lazy_split should be enabled only if "
567 "enable_large_batch_splitting is enabled.");
568 }
569
570 if (options.enable_large_batch_splitting &&
571 (options.input_batch_size_limit < options.max_execution_batch_size)) {
572 return errors::InvalidArgument(
573 "When enable_large_batch_splitting is true, input_batch_size_limit "
574 "must be "
575 "greater than or equal to max_execution_batch_size.",
576 options.enable_large_batch_splitting, options.input_batch_size_limit,
577 options.max_execution_batch_size);
578 }
579
580 auto schedulable_batch_callback = [this] {
581 mutex_lock l(mu_);
582 schedulable_batch_cv_.notify_one();
583 };
584 auto internal_queue =
585 std::unique_ptr<internal::Queue<TaskType>>(new internal::Queue<TaskType>(
586 options, options_.env, process_batch_callback,
587 schedulable_batch_callback));
588 auto handle = std::unique_ptr<BatchScheduler<TaskType>>(
589 new internal::QueueHandle<TaskType>(this->shared_from_this(),
590 internal_queue.get()));
591 {
592 mutex_lock l(mu_);
593 queues_.push_back(std::move(internal_queue));
594 if (next_queue_to_schedule_ == queues_.end()) {
595 next_queue_to_schedule_ = queues_.begin();
596 }
597 }
598 *queue = std::move(handle);
599 return Status::OK();
600 }
601
602 template <typename TaskType>
SharedBatchScheduler(const Options & options)603 SharedBatchScheduler<TaskType>::SharedBatchScheduler(const Options& options)
604 : options_(options), next_queue_to_schedule_(queues_.end()) {
605 // Kick off the batch threads.
606 PeriodicFunction::Options periodic_fn_options;
607 periodic_fn_options.thread_name_prefix =
608 strings::StrCat(options.thread_pool_name, "_");
609 for (int i = 0; i < options.num_batch_threads; ++i) {
610 std::unique_ptr<PeriodicFunction> thread(new PeriodicFunction(
611 [this] { this->ThreadLogic(); },
612 0 /* function invocation interval time */, periodic_fn_options));
613 batch_threads_.push_back(std::move(thread));
614 }
615 }
616
617 template <typename TaskType>
GetNextWorkItem_Locked(internal::Queue<TaskType> ** queue_for_batch_out,std::unique_ptr<Batch<TaskType>> * batch_to_process_out)618 void SharedBatchScheduler<TaskType>::GetNextWorkItem_Locked(
619 internal::Queue<TaskType>** queue_for_batch_out,
620 std::unique_ptr<Batch<TaskType>>* batch_to_process_out) {
621 std::unique_ptr<Batch<TaskType>> batch_to_process;
622 internal::Queue<TaskType>* queue_for_batch = nullptr;
623 const int num_queues = queues_.size();
624 for (int num_queues_tried = 0;
625 batch_to_process == nullptr && num_queues_tried < num_queues;
626 ++num_queues_tried) {
627 DCHECK(next_queue_to_schedule_ != queues_.end());
628
629 // If a closed queue responds to ScheduleBatch() with nullptr, the queue
630 // will never yield any further batches so we can drop it. To avoid a
631 // race, we take a snapshot of the queue's closedness state *before*
632 // calling ScheduleBatch().
633 const bool queue_closed = (*next_queue_to_schedule_)->closed();
634
635 // Ask '*next_queue_to_schedule_' if it wants us to process a batch.
636 batch_to_process = (*next_queue_to_schedule_)->ScheduleBatch();
637 if (batch_to_process != nullptr) {
638 queue_for_batch = next_queue_to_schedule_->get();
639 }
640
641 // Advance 'next_queue_to_schedule_'.
642 if (queue_closed && (*next_queue_to_schedule_)->IsEmpty() &&
643 batch_to_process == nullptr) {
644 // We've encountered a closed queue with no work to do. Drop it.
645 DCHECK_NE(queue_for_batch, next_queue_to_schedule_->get());
646 next_queue_to_schedule_ = queues_.erase(next_queue_to_schedule_);
647 } else {
648 ++next_queue_to_schedule_;
649 }
650 if (next_queue_to_schedule_ == queues_.end() && !queues_.empty()) {
651 // We've hit the end. Wrap to the first queue.
652 next_queue_to_schedule_ = queues_.begin();
653 }
654 }
655 *queue_for_batch_out = queue_for_batch;
656 *batch_to_process_out = std::move(batch_to_process);
657 }
658
659 template <typename TaskType>
ThreadLogic()660 void SharedBatchScheduler<TaskType>::ThreadLogic() {
661 // A batch to process next (or nullptr if no work to do).
662 std::unique_ptr<Batch<TaskType>> batch_to_process;
663 // The queue with which 'batch_to_process' is associated.
664 internal::Queue<TaskType>* queue_for_batch = nullptr;
665 {
666 mutex_lock l(mu_);
667 while (true) {
668 GetNextWorkItem_Locked(&queue_for_batch, &batch_to_process);
669 if (batch_to_process != nullptr) break;
670 if (queues_.empty()) return;
671 // We couldn't find any work to do. Wait until a new batch becomes
672 // schedulable, or some time has elapsed, before checking again.
673 const int64_t kTimeoutMillis =
674 1; // The smallest accepted granule of time.
675 WaitForMilliseconds(&l, &schedulable_batch_cv_, kTimeoutMillis);
676 }
677 }
678
679 queue_for_batch->ProcessBatch(std::move(batch_to_process));
680 }
681
682 namespace internal {
683
684 template <typename TaskType>
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions & options,Env * env,ProcessBatchCallback process_batch_callback,SchedulableBatchCallback schedulable_batch_callback)685 Queue<TaskType>::Queue(
686 const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
687 Env* env, ProcessBatchCallback process_batch_callback,
688 SchedulableBatchCallback schedulable_batch_callback)
689 : options_(options),
690 env_(env),
691 max_execution_batch_size_(GetMaxExecutionBatchSize(options_)),
692 process_batch_callback_(process_batch_callback),
693 schedulable_batch_callback_(schedulable_batch_callback) {
694 // Set the higher 32 bits of traceme_context_id_counter_ to be the creation
695 // time of the queue. This prevents the batches in different queues to have
696 // the same traceme_context_id_counter_.
697 traceme_context_id_counter_ = absl::GetCurrentTimeNanos() << 32;
698 // Create an initial, open batch.
699 if (options_.enable_lazy_split) {
700 task_handle_batches_.emplace_back(
701 new Batch<BatchInputTaskHandle<TaskType>>);
702 } else {
703 batches_.emplace_back(new Batch<TaskType>);
704 }
705 }
706
707 template <typename TaskType>
~Queue()708 Queue<TaskType>::~Queue() {
709 mutex_lock l(mu_);
710 DCHECK(IsEmptyInternal());
711
712 // Close the (empty) open batch, so its destructor doesn't block.
713 if (options_.enable_lazy_split) {
714 task_handle_batches_.back()->Close();
715 } else {
716 batches_.back()->Close();
717 }
718 }
719
720 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)721 Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
722 if ((*task)->size() > options_.input_batch_size_limit) {
723 return errors::InvalidArgument("Task size ", (*task)->size(),
724 " is larger than maximum input batch size ",
725 options_.input_batch_size_limit);
726 }
727 if (options_.enable_lazy_split) {
728 return ScheduleWithLazySplit(std::move(task));
729 }
730 return ScheduleWithoutOrEagerSplit(std::move(task));
731 }
732
733 template <typename TaskType>
ScheduleWithLazySplit(std::unique_ptr<TaskType> * task)734 Status Queue<TaskType>::ScheduleWithLazySplit(std::unique_ptr<TaskType>* task) {
735 profiler::TraceMe trace_me([task] {
736 return profiler::TraceMeEncode(
737 "ScheduleWithLazySplit",
738 {{"batching_input_task_size", (*task)->size()}});
739 });
740 // The max size to be enqueued.
741 const int max_execution_batch_size = options_.max_execution_batch_size;
742
743 bool notify_of_schedulable_batch = false;
744 {
745 mutex_lock l(mu_);
746
747 DCHECK(!closed_);
748
749 if (BatchTaskExceedQueueCapacity((*task).get())) {
750 return errors::Unavailable(
751 "The batch scheduling queue to which this task was submitted is "
752 "full");
753 }
754 const int64 open_batch_capacity =
755 max_execution_batch_size - this->tail_batch_task_size();
756
757 auto input_batch = std::make_shared<BatchInputTask<TaskType>>(
758 std::move(*task), open_batch_capacity, max_execution_batch_size,
759 options_.split_input_task_func);
760 std::vector<std::unique_ptr<BatchInputTaskHandle<TaskType>>> task_handles;
761
762 input_batch->ToTaskHandles(&task_handles);
763
764 for (int i = 0; i < task_handles.size(); ++i) {
765 if (task_handle_batches_.back()->size() + task_handles[i]->size() >
766 options_.max_execution_batch_size) {
767 StartNewBatch();
768 }
769 if (task_handle_batches_.back()->empty()) {
770 open_batch_start_time_micros_ = env_->NowMicros();
771 }
772 profiler::TraceMeProducer trace_me(
773 [&task_handles, i] {
774 return profiler::TraceMeEncode("ScheduleOutputTask",
775 {{"size", task_handles[i]->size()}});
776 },
777 profiler::ContextType::kSharedBatchScheduler,
778 task_handle_batches_.back()->traceme_context_id());
779
780 task_handle_batches_.back()->AddTask(std::move(task_handles[i]));
781 }
782
783 if (!schedulable_batch_) {
784 if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
785 schedulable_batch_ = true;
786 notify_of_schedulable_batch = true;
787 }
788 }
789 }
790 // TODO(b/194294263):
791 // Add unit tests to verify that `schedulable_batch_callback_` could be
792 // triggered when batches are scheduled.
793 if (notify_of_schedulable_batch) {
794 schedulable_batch_callback_();
795 }
796
797 return Status::OK();
798 }
799
800 // TODO(b/194294263):
801 // Merge `ScheduleWithoutOrEagerSplit` and `ScheduleWithLazySplit` into
802 // `Schedule`.
803 template <typename TaskType>
ScheduleWithoutOrEagerSplit(std::unique_ptr<TaskType> * task)804 Status Queue<TaskType>::ScheduleWithoutOrEagerSplit(
805 std::unique_ptr<TaskType>* task) {
806 const bool large_batch_splitting = options_.enable_large_batch_splitting;
807 profiler::TraceMe trace_me([task, large_batch_splitting] {
808 return profiler::TraceMeEncode(
809 large_batch_splitting ? "ScheduleWithEagerSplit"
810 : "ScheduleWithoutSplit",
811 {{"batching_input_task_size", (*task)->size()}});
812 });
813
814 bool notify_of_schedulable_batch = false;
815 {
816 mutex_lock l(mu_);
817
818 DCHECK(!closed_);
819
820 // TODO(b/161857471):
821 // Add test coverage when when concurrent incoming batches arrives and
822 // use up all queue capacity.
823 if (BatchTaskExceedQueueCapacity((*task).get())) {
824 return errors::Unavailable(
825 "The batch scheduling queue to which this task was submitted is "
826 "full");
827 }
828
829 const int64_t open_batch_remaining_slot =
830 options_.max_execution_batch_size - batches_.back()->size();
831
832 const int64_t input_task_size = (*task)->size();
833
834 std::vector<std::unique_ptr<TaskType>> output_tasks;
835
836 if (input_task_size <= open_batch_remaining_slot ||
837 !large_batch_splitting) {
838 // This is the fast path when input doesn't need to be split.
839 output_tasks.push_back(std::move(*task));
840 } else {
841 TF_RETURN_IF_ERROR(SplitInputBatchIntoSubtasks(task, &output_tasks));
842 }
843
844 for (int i = 0; i < output_tasks.size(); ++i) {
845 if (batches_.back()->size() + output_tasks[i]->size() >
846 options_.max_execution_batch_size) {
847 StartNewBatch();
848 }
849 if (batches_.back()->empty()) {
850 open_batch_start_time_micros_ = env_->NowMicros();
851 }
852 profiler::TraceMeProducer trace_me(
853 [&output_tasks, i] {
854 return profiler::TraceMeEncode("ScheduleOutputTask",
855 {{"size", output_tasks[i]->size()}});
856 },
857 profiler::ContextType::kSharedBatchScheduler,
858 batches_.back()->traceme_context_id());
859 batches_.back()->AddTask(std::move(output_tasks[i]));
860 }
861
862 if (!schedulable_batch_) {
863 if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
864 schedulable_batch_ = true;
865 notify_of_schedulable_batch = true;
866 }
867 }
868 }
869
870 if (notify_of_schedulable_batch) {
871 schedulable_batch_callback_();
872 }
873
874 return Status::OK();
875 }
876
877 template <typename TaskType>
NumEnqueuedTasks()878 size_t Queue<TaskType>::NumEnqueuedTasks() const {
879 size_t num_enqueued_tasks = 0;
880 mutex_lock l(mu_);
881 if (options_.enable_lazy_split) {
882 for (const auto& batch : task_handle_batches_) {
883 num_enqueued_tasks += batch->num_tasks();
884 }
885 return num_enqueued_tasks;
886 }
887
888 for (const auto& batch : batches_) {
889 num_enqueued_tasks += batch->num_tasks();
890 }
891 return num_enqueued_tasks;
892 }
893
894 template <typename TaskType>
SchedulingCapacity()895 size_t Queue<TaskType>::SchedulingCapacity() const {
896 mutex_lock l(mu_);
897 return SchedulingCapacityInternal();
898 }
899
900 template <typename TaskType>
SchedulingCapacityInternal()901 size_t Queue<TaskType>::SchedulingCapacityInternal() const {
902 const int64 num_new_batches_schedulable =
903 static_cast<int64>(options_.max_enqueued_batches) -
904 this->num_enqueued_batches();
905 const int64 execution_batch_size_limit = max_execution_batch_size();
906 const int64 open_batch_capacity =
907 execution_batch_size_limit - this->tail_batch_task_size();
908 // Note the returned value is guaranteed to be not negative, since
909 // enqueue operation could only happen if queue has enough capacity.
910 return (num_new_batches_schedulable * execution_batch_size_limit) +
911 open_batch_capacity;
912 }
913
914 template <typename TaskType>
BatchTaskExceedQueueCapacity(TaskType * task)915 bool Queue<TaskType>::BatchTaskExceedQueueCapacity(TaskType* task) const {
916 // Queue creation requires that `enable_large_batch_splitting` is true
917 // when `enable_lazy_split` is true, so this covers both eager split and
918 // lazy split.
919 if (options_.enable_large_batch_splitting) {
920 return task->size() > SchedulingCapacityInternal();
921 }
922
923 // NOTE, the capacity checking below is loose and is retained
924 // for backward compatibility that was broken due to the merge of no-split
925 // and eager split.
926 // There are existing clients/models that rely on the loose check
927 // and can get errors after the merge. Retaining the old behavior
928 // allows such models to continue to work.
929 //
930 // We need to revisit/remove this check after we fix model configs.
931 bool batch_task_exceed_queue_capacity = false;
932 if (batches_.back()->size() + task->size() >
933 options_.input_batch_size_limit) {
934 if (batches_.size() >= options_.max_enqueued_batches) {
935 batch_task_exceed_queue_capacity = true;
936 }
937 }
938 return batch_task_exceed_queue_capacity;
939 }
940
941 template <typename TaskType>
942 std::unique_ptr<Batch<TaskType>>
ScheduleBatchWithEagerSplit()943 Queue<TaskType>::ScheduleBatchWithEagerSplit() {
944 // The batch to schedule, which we may populate below. (If left as nullptr,
945 // that means we are electing not to schedule a batch at this time.)
946 std::unique_ptr<Batch<TaskType>> batch_to_schedule;
947
948 {
949 mutex_lock l(mu_);
950
951 // Consider closing the open batch at this time, to schedule it.
952 if (batches_.size() == 1 && IsOpenBatchSchedulable()) {
953 StartNewBatch();
954 }
955
956 if (batches_.size() >= 2) {
957 // There is at least one closed batch that is ready to be scheduled.
958 ++num_batches_being_processed_;
959 batch_to_schedule = std::move(batches_.front());
960 batches_.pop_front();
961 } else {
962 schedulable_batch_ = false;
963 }
964 }
965
966 return batch_to_schedule;
967 }
968
969 template <typename TaskType>
ScheduleBatch()970 std::unique_ptr<Batch<TaskType>> Queue<TaskType>::ScheduleBatch() {
971 if (!options_.enable_lazy_split) {
972 return ScheduleBatchWithEagerSplit();
973 }
974 // The batch to schedule, which we may populate below. (If left as nullptr,
975 // that means we are electing not to schedule a batch at this time.)
976 std::unique_ptr<Batch<BatchInputTaskHandle<TaskType>>>
977 task_handles_to_schedule;
978
979 {
980 mutex_lock l(mu_);
981
982 // Consider closing the open batch at this time, to schedule it.
983 if (task_handle_batches_.size() == 1 && IsOpenBatchSchedulable()) {
984 StartNewBatch();
985 }
986
987 if (task_handle_batches_.size() >= 2) {
988 // There is at least one closed batch that is ready to be scheduled.
989 ++num_batches_being_processed_;
990 task_handles_to_schedule = std::move(task_handle_batches_.front());
991 task_handle_batches_.pop_front();
992 } else {
993 schedulable_batch_ = false;
994 }
995 }
996
997 std::unique_ptr<Batch<TaskType>> batch_to_schedule;
998 if (task_handles_to_schedule != nullptr) {
999 batch_to_schedule = std::make_unique<Batch<TaskType>>();
1000 std::vector<std::unique_ptr<BatchInputTaskHandle<TaskType>>> task_handles =
1001 task_handles_to_schedule->RemoveAllTasks();
1002
1003 // TODO(b/194294263):
1004 // Handle the batch-kernel callback properly when lazy split returns error.
1005 for (int i = 0; i < task_handles.size(); i++) {
1006 batch_to_schedule->AddTask(std::move(task_handles[i]->GetSplitTask()));
1007 }
1008 batch_to_schedule->Close();
1009 }
1010 return batch_to_schedule;
1011 }
1012
1013 template <typename TaskType>
ProcessBatch(std::unique_ptr<Batch<TaskType>> batch)1014 void Queue<TaskType>::ProcessBatch(std::unique_ptr<Batch<TaskType>> batch) {
1015 profiler::TraceMeConsumer trace_me(
1016 [&] {
1017 return profiler::TraceMeEncode(
1018 "ProcessBatch", {{"batch_size_before_padding", batch->size()},
1019 {"_r", 2} /*root_event*/});
1020 },
1021 profiler::ContextType::kSharedBatchScheduler,
1022 batch->traceme_context_id());
1023 process_batch_callback_(std::move(batch));
1024
1025 {
1026 mutex_lock l(mu_);
1027 --num_batches_being_processed_;
1028 if (empty_notification_ != nullptr && IsEmptyInternal()) {
1029 empty_notification_->Notify();
1030 }
1031 }
1032 }
1033
1034 template <typename TaskType>
IsEmpty()1035 bool Queue<TaskType>::IsEmpty() const {
1036 mutex_lock l(mu_);
1037 return IsEmptyInternal();
1038 }
1039
1040 template <typename TaskType>
CloseAndWaitUntilEmpty()1041 void Queue<TaskType>::CloseAndWaitUntilEmpty() {
1042 Notification empty;
1043 {
1044 mutex_lock l(mu_);
1045 closed_ = true;
1046 if (IsEmptyInternal()) {
1047 empty.Notify();
1048 } else {
1049 // Arrange for ProcessBatch() to notify when the queue becomes empty.
1050 empty_notification_ = ∅
1051 }
1052 }
1053 empty.WaitForNotification();
1054 }
1055
1056 template <typename TaskType>
IsEmptyInternal()1057 bool Queue<TaskType>::IsEmptyInternal() const {
1058 if (options_.enable_lazy_split) {
1059 return num_batches_being_processed_ == 0 &&
1060 task_handle_batches_.size() == 1 &&
1061 task_handle_batches_.back()->empty();
1062 }
1063 return num_batches_being_processed_ == 0 && batches_.size() == 1 &&
1064 batches_.back()->empty();
1065 }
1066
1067 template <typename TaskType>
StartNewBatch()1068 void Queue<TaskType>::StartNewBatch() {
1069 if (options_.enable_lazy_split) {
1070 task_handle_batches_.back()->Close();
1071 task_handle_batches_.emplace_back(new Batch<BatchInputTaskHandle<TaskType>>(
1072 ++traceme_context_id_counter_));
1073 return;
1074 }
1075 batches_.back()->Close();
1076 batches_.emplace_back(new Batch<TaskType>(++traceme_context_id_counter_));
1077 }
1078
1079 template <typename TaskType>
SplitInputBatchIntoSubtasks(std::unique_ptr<TaskType> * input_task,std::vector<std::unique_ptr<TaskType>> * output_tasks)1080 Status Queue<TaskType>::SplitInputBatchIntoSubtasks(
1081 std::unique_ptr<TaskType>* input_task,
1082 std::vector<std::unique_ptr<TaskType>>* output_tasks) {
1083 const int open_batch_remaining_slot =
1084 max_execution_batch_size() - this->tail_batch_task_size();
1085 return options_.split_input_task_func(
1086 std::move(input_task), open_batch_remaining_slot,
1087 max_execution_batch_size(), std::move(output_tasks));
1088 }
1089
1090 template <typename TaskType>
IsOpenBatchSchedulableAfterEagerSplit()1091 bool Queue<TaskType>::IsOpenBatchSchedulableAfterEagerSplit() const {
1092 Batch<TaskType>* open_batch = batches_.back().get();
1093 if (open_batch->empty()) {
1094 return false;
1095 }
1096 return closed_ || open_batch->size() >= max_execution_batch_size() ||
1097 env_->NowMicros() >=
1098 open_batch_start_time_micros_ + options_.batch_timeout_micros;
1099 }
1100
1101 template <typename TaskType>
IsOpenBatchSchedulable()1102 bool Queue<TaskType>::IsOpenBatchSchedulable() const {
1103 if (!options_.enable_lazy_split) {
1104 return IsOpenBatchSchedulableAfterEagerSplit();
1105 }
1106 Batch<BatchInputTaskHandle<TaskType>>* open_batch =
1107 task_handle_batches_.back().get();
1108 if (open_batch->empty()) {
1109 return false;
1110 }
1111 return closed_ || open_batch->size() >= max_execution_batch_size() ||
1112 env_->NowMicros() >=
1113 open_batch_start_time_micros_ + options_.batch_timeout_micros;
1114 }
1115
1116 template <typename TaskType>
tail_batch_task_size()1117 size_t Queue<TaskType>::tail_batch_task_size() const {
1118 if (options_.enable_lazy_split) {
1119 return task_handle_batches_.back()->size();
1120 }
1121
1122 return batches_.back()->size();
1123 }
1124
1125 template <typename TaskType>
num_enqueued_batches()1126 int64 Queue<TaskType>::num_enqueued_batches() const {
1127 if (options_.enable_lazy_split) {
1128 return task_handle_batches_.size();
1129 }
1130 return batches_.size();
1131 }
1132
1133 template <typename TaskType>
QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,Queue<TaskType> * queue)1134 QueueHandle<TaskType>::QueueHandle(
1135 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
1136 Queue<TaskType>* queue)
1137 : scheduler_(scheduler), queue_(queue) {}
1138
1139 template <typename TaskType>
~QueueHandle()1140 QueueHandle<TaskType>::~QueueHandle() {
1141 queue_->CloseAndWaitUntilEmpty();
1142 }
1143
1144 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)1145 Status QueueHandle<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
1146 return queue_->Schedule(task);
1147 }
1148
1149 template <typename TaskType>
NumEnqueuedTasks()1150 size_t QueueHandle<TaskType>::NumEnqueuedTasks() const {
1151 return queue_->NumEnqueuedTasks();
1152 }
1153
1154 template <typename TaskType>
SchedulingCapacity()1155 size_t QueueHandle<TaskType>::SchedulingCapacity() const {
1156 return queue_->SchedulingCapacity();
1157 }
1158
1159 } // namespace internal
1160
1161 } // namespace serving
1162 } // namespace tensorflow
1163
1164 #endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
1165