• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
18 
19 #include <stddef.h>
20 
21 #include <deque>
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/time/clock.h"
30 #include "tensorflow/core/kernels/batching_util/batch_input_task.h"
31 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
32 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
33 #include "tensorflow/core/lib/core/errors.h"
34 #include "tensorflow/core/lib/core/status.h"
35 #include "tensorflow/core/lib/strings/strcat.h"
36 #include "tensorflow/core/platform/byte_order.h"
37 #include "tensorflow/core/platform/cpu_info.h"
38 #include "tensorflow/core/platform/env.h"
39 #include "tensorflow/core/platform/errors.h"
40 #include "tensorflow/core/platform/thread_annotations.h"
41 #include "tensorflow/core/platform/types.h"
42 #include "tensorflow/core/profiler/lib/connected_traceme.h"
43 #include "tensorflow/core/profiler/lib/traceme.h"
44 #include "tensorflow/core/profiler/lib/traceme_encode.h"
45 
46 namespace tensorflow {
47 namespace serving {
48 namespace internal {
49 template <typename TaskType>
50 class Queue;
51 }  // namespace internal
52 }  // namespace serving
53 }  // namespace tensorflow
54 
55 namespace tensorflow {
56 namespace serving {
57 
58 // A batch scheduler for server instances that service multiple request types
59 // (e.g. multiple machine-learned models, or multiple versions of a model served
60 // concurrently), or even multiple distinct tasks for a given request. The
61 // scheduler multiplexes batches of different kinds of tasks onto a fixed-size
62 // thread pool (each batch contains tasks of a single type), in a carefully
63 // controlled manner. A common configuration is to set the number of threads
64 // equal to the number of hardware accelerator units, in which case the
65 // scheduler takes care of multiplexing the task types onto the shared hardware,
66 // in a manner that is both fair and efficient.
67 //
68 // Semantically, SharedBatchScheduler behaves like having N instances of
69 // BasicBatchScheduler (see basic_batch_scheduler.h), one per task type. The
70 // difference is that under the covers there is a single shared thread pool,
71 // instead of N independent ones, with their sharing deliberately coordinated.
72 //
73 // SharedBatchScheduler does not implement the BatchScheduler API; rather, it
74 // presents an abstraction of "queues", where each queue corresponds to one type
75 // of task. Tasks submitted to a given queue are placed in their own batches,
76 // and cannot be mixed with other tasks. Queues can be added and deleted
77 // dynamically, to accommodate e.g. versions of a model being brought up and
78 // down over the lifetime of a server.
79 //
80 // The batch thread pool round-robins through the queues, running one batch
81 // from a queue and then moving to the next queue. Each queue behaves like a
82 // BasicBatchScheduler instance, in the sense that it has maximum batch size and
83 // timeout parameters, which govern when a batch is eligible to be processed.
84 //
85 // Each queue is independently configured with a maximum size (in terms of the
86 // maximum number of batches worth of enqueued tasks). For online serving, it is
87 // recommended that the queue sizes be configured such that the sum of the sizes
88 // of the active queues roughly equal the number of batch threads. (The idea is
89 // that if all threads become available at roughly the same time, there will be
90 // enough enqueued work for them to take on, but no more.)
91 //
92 // If queue sizes are configured in the manner suggested above, the maximum time
93 // a task can spend in a queue before being placed in a batch and assigned to a
94 // thread for processing, is the greater of:
95 //  - the maximum time to process one batch of tasks from any active queue
96 //  - the configured timeout parameter for the task's queue (which can be 0)
97 //
98 // For bulk processing jobs and throughput-oriented benchmarks, you may want to
99 // set the maximum queue size to a large value.
100 //
101 // TODO(b/26539183): Support queue servicing policies other than round-robin.
102 // E.g. let each queue specify a "share" (an int >= 1), so e.g. with queues A
103 // and B having shares 1 and 2 respectively, the servicing pattern is ABBABB...
104 //
105 //
106 // PERFORMANCE TUNING: See README.md.
107 //
108 template <typename TaskType>
109 class SharedBatchScheduler
110     : public std::enable_shared_from_this<SharedBatchScheduler<TaskType>> {
111  public:
112   // TODO(b/25089730): Tune defaults based on best practices as they develop.
113   struct Options {
114     // The name to use for the pool of batch threads.
115     string thread_pool_name = {"batch_threads"};
116 
117     // The number of threads to use to process batches.
118     // Must be >= 1, and should be tuned carefully.
119     int num_batch_threads = port::MaxParallelism();
120 
121     // The environment to use.
122     // (Typically only overridden by test code.)
123     Env* env = Env::Default();
124   };
125   // Ownership is shared between the caller of Create() and any queues created
126   // via AddQueue().
127   static Status Create(
128       const Options& options,
129       std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler);
130 
131   ~SharedBatchScheduler();
132 
133   // Adds a queue to which tasks may be submitted. The returned queue implements
134   // the BatchScheduler API. Each queue has its own set of scheduling options,
135   // and its own callback to process batches of tasks submitted to the queue.
136   //
137   // The returned queue's destructor blocks until all tasks submitted to it have
138   // been processed.
139   struct QueueOptions {
140     // The size limit of an input batch to the queue.
141     //
142     // If `enable_large_batch_splitting` is True, 'input_batch_size_limit'
143     // should be greater or equal than `max_execution_batch_size`; otherwise
144     // `input_batch_size_limit` should be equal to `max_execution_batch_size`.
145     size_t input_batch_size_limit = 1000;
146 
147     // If a task has been enqueued for this amount of time (in microseconds),
148     // and a thread is available, the scheduler will immediately form a batch
149     // from enqueued tasks and assign the batch to the thread for processing,
150     // even if the batch's size is below 'input_batch_size_limit'.
151     //
152     // This parameter offers a way to bound queue latency, so that a task isn't
153     // stuck in the queue indefinitely waiting for enough tasks to arrive to
154     // make a full batch. (The latency bound is given in the class documentation
155     // above.)
156     //
157     // The goal is to smooth out batch sizes under low request rates, and thus
158     // avoid latency spikes.
159     int64 batch_timeout_micros = 0;
160 
161     // The maximum allowable number of enqueued (accepted by Schedule() but
162     // not yet being processed on a batch thread) tasks in terms of batches.
163     // If this limit is reached, Schedule() will return an UNAVAILABLE error.
164     // See the class documentation above for guidelines on how to tune this
165     // parameter.
166     //
167     // Must be positive, or else invalid argument error will be returned at
168     // queue creation time.
169     size_t max_enqueued_batches = 10;
170 
171     // If true, queue implementation would split one input batch task into
172     // subtasks (as specified by `split_input_task_func` below) and fit subtasks
173     // into different batches.
174     //
175     // For usage of `split_input_task_func`, please see its comment.
176     bool enable_large_batch_splitting = false;
177 
178     // `input_task`: a unit of task to be split.
179     // `first_output_task_size`: task size of first output.
180     // `max_execution_batch_size`: Maximum size of each batch.
181     // `output_tasks`: A list of output tasks after split.
182     //
183     // REQUIRED:
184     // 1) All `output_tasks` should be non-empty tasks.
185     // 2) Sizes of `output_tasks` add up to size of `input_task`.
186     //
187     // NOTE:
188     // Instantiations of `TaskType` may vary, so it's up to caller to define
189     // how (e.g., which members to access) to split input tasks.
190     std::function<Status(std::unique_ptr<TaskType>* input_task,
191                          int first_output_task_size, int input_batch_size_limit,
192                          std::vector<std::unique_ptr<TaskType>>* output_tasks)>
193         split_input_task_func;
194 
195     // If true, batch input tasks are split lazily after dequeue and not on the
196     // critical path of enqueue operations.
197     //
198     // Must be false if `enable_large_batch_splitting` is false; elsewise errors
199     // will be returned at queue creation time.
200     //
201     // TODO(b/194294263):
202     // Make `enable_lazy_split` a template parameter of queue, and adapts
203     // `batches_` and `task_handle_batches_` into one deque of
204     // tensorflow::serving::Batch.
205     bool enable_lazy_split = false;
206 
207     // The maximum size of each enqueued batch (i.e., in `batches_`).
208     //
209     // The scheduler may form batches of any size between 1 and this number
210     // (inclusive). If there is a need to quantize the batch sizes, i.e. only
211     // submit batches whose size is in a small set of allowed sizes, that can be
212     // done by adding padding in the process-batch callback.
213     size_t max_execution_batch_size = 1000;
214   };
215   Status AddQueue(const QueueOptions& options,
216                   std::function<void(std::unique_ptr<Batch<TaskType>>)>
217                       process_batch_callback,
218                   std::unique_ptr<BatchScheduler<TaskType>>* queue);
219 
220  private:
221   explicit SharedBatchScheduler(const Options& options);
222 
223   void GetNextWorkItem_Locked(
224       internal::Queue<TaskType>** queue_for_batch_out,
225       std::unique_ptr<Batch<TaskType>>* batch_to_process_out)
226       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
227 
228   // The code executed in 'batch_threads_'. Obtains a batch to process from the
229   // queue pointed to by 'next_queue_to_schedule_', and processes it. If that
230   // queue declines to provide a batch to process, moves onto the next queue. If
231   // no queues provide a batch to process, just sleeps briefly and exits.
232   void ThreadLogic();
233 
234   const Options options_;
235 
236   mutex mu_;
237 
238   // A list of queues. (We use std::list instead of std::vector to ensure that
239   // iterators are not invalidated by adding/removing elements. It also offers
240   // efficient removal of elements from the middle.)
241   using QueueList = std::list<std::unique_ptr<internal::Queue<TaskType>>>;
242 
243   // All "active" queues, i.e. ones that either:
244   //  - have not been removed, or
245   //  - have been removed but are not yet empty.
246   QueueList queues_ TF_GUARDED_BY(mu_);
247 
248   // An iterator over 'queues_', pointing to the queue from which the next
249   // available batch thread should grab work.
250   typename QueueList::iterator next_queue_to_schedule_ TF_GUARDED_BY(mu_);
251 
252   // Used by idle batch threads to wait for work to enter the system. Notified
253   // whenever a batch becomes schedulable.
254   condition_variable schedulable_batch_cv_;
255 
256   // Threads that process batches obtained from the queues.
257   std::vector<std::unique_ptr<PeriodicFunction>> batch_threads_;
258 
259   TF_DISALLOW_COPY_AND_ASSIGN(SharedBatchScheduler);
260 };
261 
262 //////////
263 // Implementation details follow. API users need not read.
264 
265 namespace internal {
266 
267 // A task queue for SharedBatchScheduler. Accepts tasks and accumulates them
268 // into batches, and dispenses those batches to be processed via a "pull"
269 // interface. The queue's behavior is governed by maximum batch size, timeout
270 // and maximum queue length parameters; see their documentation in
271 // SharedBatchScheduler.
272 //
273 // The queue is implemented as a deque of batches, with these invariants:
274 //  - The number of batches is between 1 and 'options_.max_enqueued_batches'.
275 //  - The back-most batch is open; the rest are closed.
276 //
277 // Submitted tasks are added to the open batch. If that batch doesn't have room
278 // but the queue isn't full, then that batch is closed and a new open batch is
279 // started.
280 //
281 // Batch pull requests are handled by dequeuing the front-most batch if it is
282 // closed. If the front-most batch is open (i.e. the queue contains only one
283 // batch) and has reached the timeout, it is immediately closed and returned;
284 // otherwise no batch is returned for the request.
285 template <typename TaskType>
286 class Queue {
287  public:
288   using ProcessBatchCallback =
289       std::function<void(std::unique_ptr<Batch<TaskType>>)>;
290   using SchedulableBatchCallback = std::function<void()>;
291   using SplitInputTaskIntoSubtasksCallback = std::function<Status(
292       std::unique_ptr<TaskType>* input_task, int open_batch_remaining_slot,
293       int max_execution_batch_size,
294       std::vector<std::unique_ptr<TaskType>>* output_tasks)>;
295   Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
296         Env* env, ProcessBatchCallback process_batch_callback,
297         SchedulableBatchCallback schedulable_batch_callback);
298 
299   // Illegal to destruct unless the queue is empty.
300   ~Queue();
301 
302   // Submits a task to the queue, with the same semantics as
303   // BatchScheduler::Schedule().
304   Status Schedule(std::unique_ptr<TaskType>* task);
305 
306   // Enqueue `task` as it is OR split it inline (eagerly) to form batches to be
307   // processed by `Queue<TaskType>::ProcessBatch`
308   Status ScheduleWithoutOrEagerSplit(std::unique_ptr<TaskType>* task);
309 
310   // Enqueue `task` along with the batch queue metadata.
311   // Batches are formed by the time `ScheduleWithLazySplit` returns; and each
312   // batch in the deque could evaluate to a batch to be processed after it's
313   // dequeued (out of mutex-protected area).
314   Status ScheduleWithLazySplit(std::unique_ptr<TaskType>* task);
315 
316   // Returns the number of enqueued tasks, with the same semantics as
317   // BatchScheduler::NumEnqueuedTasks().
318   size_t NumEnqueuedTasks() const;
319 
320   // Returns the queue capacity, with the same semantics as
321   // BatchScheduler::SchedulingCapacity().
322   size_t SchedulingCapacity() const;
323 
324   // Returns the maximum allowed size of tasks submitted to the queue.
max_task_size()325   size_t max_task_size() const { return options_.input_batch_size_limit; }
326 
327   // Returns the maximum allowed size of tasks to be executed.
328   // Returned value would be less than or equal to the maximum allowed input
329   // size that's provided by caller of batch scheduler.
max_execution_batch_size()330   size_t max_execution_batch_size() const { return max_execution_batch_size_; }
331 
332   // Called by a thread that is ready to process a batch, to request one from
333   // this queue. Either returns a batch that is ready to be processed, or
334   // nullptr if the queue declines to schedule a batch at this time. If it
335   // returns a batch, the batch is guaranteed to be closed.
336   std::unique_ptr<Batch<TaskType>> ScheduleBatch();
337 
338   // A variant of `ScheduleBatch`.
339   // Batches are guaranteed to form at task enqueue time.
340   std::unique_ptr<Batch<TaskType>> ScheduleBatchWithEagerSplit();
341 
342   // Processes a batch that has been returned earlier by ScheduleBatch().
343   void ProcessBatch(std::unique_ptr<Batch<TaskType>> batch);
344 
345   // Determines whether the queue is empty, i.e. has no tasks waiting or being
346   // processed.
347   bool IsEmpty() const;
348 
349   // Marks the queue closed, and waits until it is empty.
350   void CloseAndWaitUntilEmpty();
351 
closed()352   bool closed() const TF_NO_THREAD_SAFETY_ANALYSIS { return closed_.load(); }
353 
354  private:
355   // Computes the max_execution_batch_size of the queue based on queue options.
GetMaxExecutionBatchSize(const typename SharedBatchScheduler<TaskType>::QueueOptions & options)356   static size_t GetMaxExecutionBatchSize(
357       const typename SharedBatchScheduler<TaskType>::QueueOptions& options) {
358     // If `enable_large_batch_splitting`, returns `max_execution_batch_size`
359     // configured by user options directly; returns `input_batch_size_limit`
360     // otherwise.
361     //
362     // Note `input_batch_size_limit` is used for backward compatibitliy ->
363     // users may not specify `max_execution_batch_size` explicitly.
364     if (options.enable_large_batch_splitting) {
365       return options.max_execution_batch_size;
366     } else {
367       return options.input_batch_size_limit;
368     }
369   }
370 
371   // Same as IsEmpty(), but assumes the caller already holds a lock on 'mu_'.
372   bool IsEmptyInternal() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
373 
374   // Closes the open batch residing at the back of std::deque, and inserts a
375   // fresh open batch behind it.
376   void StartNewBatch() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
377 
378   // Split `input task` into `output_tasks` according to 'task_sizes'.
379   Status SplitInputBatchIntoSubtasks(
380       std::unique_ptr<TaskType>* input_task,
381       std::vector<std::unique_ptr<TaskType>>* output_tasks)
382       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
383 
384   // Determines whether the open batch residing at the back of 'batches_' is
385   // currently schedulable.
386   bool IsOpenBatchSchedulable() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
387 
388   // A variant of `IsOpenBatchSchedulable`; used when batches are formed at
389   // task enqueue time, and open batch is `batches_.back()`.
390   bool IsOpenBatchSchedulableAfterEagerSplit() const
391       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
392 
393   // Same as SchedulingCapacity(), but assumes the caller already holds a
394   // lock on 'mu_'.
395   size_t SchedulingCapacityInternal() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
396 
397   // Returns true if queue doesn't have capacity for this task.
398   //
399   // `task` must outlive this method.
400   bool BatchTaskExceedQueueCapacity(TaskType* task) const
401       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
402 
403   // The task size of the last batch in the queue.
404   size_t tail_batch_task_size() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
405 
406   // Returns the number of enqueued batches.
407   int64 num_enqueued_batches() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
408 
409   const typename SharedBatchScheduler<TaskType>::QueueOptions options_;
410 
411   // The environment to use.
412   Env* env_;
413 
414   // The maximum batch size to be executed by `Queue::ProcessBatch`.
415   // See the comment of QueueOptions and helper function
416   // `GetMaxExecutionBatchSize` for more details on what it means.
417   const size_t max_execution_batch_size_;
418 
419   // A callback invoked to processes a batch of work units. Always invoked
420   // from a batch thread.
421   ProcessBatchCallback process_batch_callback_;
422 
423   // A callback invoked to notify the scheduler that a new batch has become
424   // schedulable.
425   SchedulableBatchCallback schedulable_batch_callback_;
426 
427   mutable mutex mu_;
428 
429   // Whether this queue can accept new tasks. This variable is monotonic: it
430   // starts as false, and then at some point gets set to true and remains true
431   // for the duration of this object's life.
TF_GUARDED_BY(mu_)432   std::atomic<bool> closed_ TF_GUARDED_BY(mu_){false};
433 
434   // The enqueued batches.
435   // Each element corresponds to a task to be dequeued and processed by
436   // `Queue<TaskType>::ProcessBatch`.
437   //
438   // Used iff `QueueOptions.enable_lazy_split` is false.
439   std::deque<std::unique_ptr<Batch<TaskType>>> batches_ TF_GUARDED_BY(mu_);
440 
441   // The enqueued batches.
442   //
443   // Each element corresponds to the `task` enqueued in `Queue::Schedule`; the
444   // element could be split and processed in batches at dequeue time.
445   //
446   // Used iff `QueueOptions.enable_lazy_split` is true.
447   std::deque<std::unique_ptr<Batch<BatchInputTaskHandle<TaskType>>>>
448       task_handle_batches_ TF_GUARDED_BY(mu_);
449 
450   // The counter of the TraceMe context ids.
451   uint64 traceme_context_id_counter_ TF_GUARDED_BY(mu_) = 0;
452 
453   // The time at which the first task was added to the open (back-most) batch
454   // in 'batches_'. Valid iff that batch contains at least one task.
455   uint64 open_batch_start_time_micros_ TF_GUARDED_BY(mu_);
456 
457   // Whether this queue contains a batch that is eligible to be scheduled.
458   // Used to keep track of when to call 'schedulable_batch_callback_'.
459   bool schedulable_batch_ TF_GUARDED_BY(mu_) = false;
460 
461   // The number of batches currently being processed by batch threads.
462   // Incremented in ScheduleBatch() and decremented in ProcessBatch().
463   int num_batches_being_processed_ TF_GUARDED_BY(mu_) = 0;
464 
465   // Used by CloseAndWaitUntilEmpty() to wait until the queue is empty, for
466   // the case in which the queue is not empty when CloseAndWaitUntilEmpty()
467   // starts. When ProcessBatch() dequeues the last batch and makes the queue
468   // empty, if 'empty_notification_' is non-null it calls
469   // 'empty_notification_->Notify()'.
470   Notification* empty_notification_ TF_GUARDED_BY(mu_) = nullptr;
471 
472   TF_DISALLOW_COPY_AND_ASSIGN(Queue);
473 };
474 
475 // A RAII-style object that points to a Queue and implements
476 // the BatchScheduler API. To be handed out to clients who call AddQueue().
477 template <typename TaskType>
478 class QueueHandle : public BatchScheduler<TaskType> {
479  public:
480   QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
481               Queue<TaskType>* queue);
482   ~QueueHandle() override;
483 
484   Status Schedule(std::unique_ptr<TaskType>* task) override;
485   size_t NumEnqueuedTasks() const override;
486   size_t SchedulingCapacity() const override;
487 
max_task_size()488   size_t max_task_size() const override { return queue_->max_task_size(); }
489 
490  private:
491   // The scheduler that owns 'queue_'.
492   std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler_;
493 
494   // The queue this handle wraps. Owned by 'scheduler_', which keeps it alive at
495   // least until this class's destructor closes it.
496   Queue<TaskType>* queue_;
497 
498   TF_DISALLOW_COPY_AND_ASSIGN(QueueHandle);
499 };
500 
501 }  // namespace internal
502 
503 template <typename TaskType>
Create(const Options & options,std::shared_ptr<SharedBatchScheduler<TaskType>> * scheduler)504 Status SharedBatchScheduler<TaskType>::Create(
505     const Options& options,
506     std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler) {
507   if (options.num_batch_threads < 1) {
508     return errors::InvalidArgument("num_batch_threads must be positive; was ",
509                                    options.num_batch_threads);
510   }
511   scheduler->reset(new SharedBatchScheduler<TaskType>(options));
512   return Status::OK();
513 }
514 
515 template <typename TaskType>
~SharedBatchScheduler()516 SharedBatchScheduler<TaskType>::~SharedBatchScheduler() {
517   // Wait until the batch threads finish clearing out and deleting the closed
518   // queues.
519   for (;;) {
520     {
521       mutex_lock l(mu_);
522       if (queues_.empty()) {
523         break;
524       }
525     }
526     const int64_t kSleepTimeMicros = 100;
527     options_.env->SleepForMicroseconds(kSleepTimeMicros);
528   }
529   // Delete the batch threads before allowing state the threads may access (e.g.
530   // 'mu_') to be deleted.
531   batch_threads_.clear();
532 }
533 
534 template <typename TaskType>
AddQueue(const QueueOptions & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BatchScheduler<TaskType>> * queue)535 Status SharedBatchScheduler<TaskType>::AddQueue(
536     const QueueOptions& options,
537     std::function<void(std::unique_ptr<Batch<TaskType>>)>
538         process_batch_callback,
539     std::unique_ptr<BatchScheduler<TaskType>>* queue) {
540   if (options.input_batch_size_limit == 0) {
541     return errors::InvalidArgument(
542         "input_batch_size_limit must be positive; was ",
543         options.input_batch_size_limit);
544   }
545   if (options.batch_timeout_micros < 0) {
546     return errors::InvalidArgument(
547         "batch_timeout_micros must be non-negative; was ",
548         options.batch_timeout_micros);
549   }
550   if (options.max_enqueued_batches == 0) {
551     return errors::InvalidArgument(
552         "max_enqueued_batches must be positive; was ",
553         options.max_enqueued_batches);
554   }
555 
556   if (options.enable_large_batch_splitting &&
557       options.split_input_task_func == nullptr) {
558     return errors::InvalidArgument(
559         "split_input_task_func must be specified when split_input_task is "
560         "true: ",
561         options.enable_large_batch_splitting);
562   }
563 
564   if (options.enable_lazy_split && (!options.enable_large_batch_splitting)) {
565     return errors::InvalidArgument(
566         "enable_lazy_split should be enabled only if "
567         "enable_large_batch_splitting is enabled.");
568   }
569 
570   if (options.enable_large_batch_splitting &&
571       (options.input_batch_size_limit < options.max_execution_batch_size)) {
572     return errors::InvalidArgument(
573         "When enable_large_batch_splitting is true, input_batch_size_limit "
574         "must be "
575         "greater than or equal to max_execution_batch_size.",
576         options.enable_large_batch_splitting, options.input_batch_size_limit,
577         options.max_execution_batch_size);
578   }
579 
580   auto schedulable_batch_callback = [this] {
581     mutex_lock l(mu_);
582     schedulable_batch_cv_.notify_one();
583   };
584   auto internal_queue =
585       std::unique_ptr<internal::Queue<TaskType>>(new internal::Queue<TaskType>(
586           options, options_.env, process_batch_callback,
587           schedulable_batch_callback));
588   auto handle = std::unique_ptr<BatchScheduler<TaskType>>(
589       new internal::QueueHandle<TaskType>(this->shared_from_this(),
590                                           internal_queue.get()));
591   {
592     mutex_lock l(mu_);
593     queues_.push_back(std::move(internal_queue));
594     if (next_queue_to_schedule_ == queues_.end()) {
595       next_queue_to_schedule_ = queues_.begin();
596     }
597   }
598   *queue = std::move(handle);
599   return Status::OK();
600 }
601 
602 template <typename TaskType>
SharedBatchScheduler(const Options & options)603 SharedBatchScheduler<TaskType>::SharedBatchScheduler(const Options& options)
604     : options_(options), next_queue_to_schedule_(queues_.end()) {
605   // Kick off the batch threads.
606   PeriodicFunction::Options periodic_fn_options;
607   periodic_fn_options.thread_name_prefix =
608       strings::StrCat(options.thread_pool_name, "_");
609   for (int i = 0; i < options.num_batch_threads; ++i) {
610     std::unique_ptr<PeriodicFunction> thread(new PeriodicFunction(
611         [this] { this->ThreadLogic(); },
612         0 /* function invocation interval time */, periodic_fn_options));
613     batch_threads_.push_back(std::move(thread));
614   }
615 }
616 
617 template <typename TaskType>
GetNextWorkItem_Locked(internal::Queue<TaskType> ** queue_for_batch_out,std::unique_ptr<Batch<TaskType>> * batch_to_process_out)618 void SharedBatchScheduler<TaskType>::GetNextWorkItem_Locked(
619     internal::Queue<TaskType>** queue_for_batch_out,
620     std::unique_ptr<Batch<TaskType>>* batch_to_process_out) {
621   std::unique_ptr<Batch<TaskType>> batch_to_process;
622   internal::Queue<TaskType>* queue_for_batch = nullptr;
623   const int num_queues = queues_.size();
624   for (int num_queues_tried = 0;
625        batch_to_process == nullptr && num_queues_tried < num_queues;
626        ++num_queues_tried) {
627     DCHECK(next_queue_to_schedule_ != queues_.end());
628 
629     // If a closed queue responds to ScheduleBatch() with nullptr, the queue
630     // will never yield any further batches so we can drop it. To avoid a
631     // race, we take a snapshot of the queue's closedness state *before*
632     // calling ScheduleBatch().
633     const bool queue_closed = (*next_queue_to_schedule_)->closed();
634 
635     // Ask '*next_queue_to_schedule_' if it wants us to process a batch.
636     batch_to_process = (*next_queue_to_schedule_)->ScheduleBatch();
637     if (batch_to_process != nullptr) {
638       queue_for_batch = next_queue_to_schedule_->get();
639     }
640 
641     // Advance 'next_queue_to_schedule_'.
642     if (queue_closed && (*next_queue_to_schedule_)->IsEmpty() &&
643         batch_to_process == nullptr) {
644       // We've encountered a closed queue with no work to do. Drop it.
645       DCHECK_NE(queue_for_batch, next_queue_to_schedule_->get());
646       next_queue_to_schedule_ = queues_.erase(next_queue_to_schedule_);
647     } else {
648       ++next_queue_to_schedule_;
649     }
650     if (next_queue_to_schedule_ == queues_.end() && !queues_.empty()) {
651       // We've hit the end. Wrap to the first queue.
652       next_queue_to_schedule_ = queues_.begin();
653     }
654   }
655   *queue_for_batch_out = queue_for_batch;
656   *batch_to_process_out = std::move(batch_to_process);
657 }
658 
659 template <typename TaskType>
ThreadLogic()660 void SharedBatchScheduler<TaskType>::ThreadLogic() {
661   // A batch to process next (or nullptr if no work to do).
662   std::unique_ptr<Batch<TaskType>> batch_to_process;
663   // The queue with which 'batch_to_process' is associated.
664   internal::Queue<TaskType>* queue_for_batch = nullptr;
665   {
666     mutex_lock l(mu_);
667     while (true) {
668       GetNextWorkItem_Locked(&queue_for_batch, &batch_to_process);
669       if (batch_to_process != nullptr) break;
670       if (queues_.empty()) return;
671       // We couldn't find any work to do. Wait until a new batch becomes
672       // schedulable, or some time has elapsed, before checking again.
673       const int64_t kTimeoutMillis =
674           1;  // The smallest accepted granule of time.
675       WaitForMilliseconds(&l, &schedulable_batch_cv_, kTimeoutMillis);
676     }
677   }
678 
679   queue_for_batch->ProcessBatch(std::move(batch_to_process));
680 }
681 
682 namespace internal {
683 
684 template <typename TaskType>
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions & options,Env * env,ProcessBatchCallback process_batch_callback,SchedulableBatchCallback schedulable_batch_callback)685 Queue<TaskType>::Queue(
686     const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
687     Env* env, ProcessBatchCallback process_batch_callback,
688     SchedulableBatchCallback schedulable_batch_callback)
689     : options_(options),
690       env_(env),
691       max_execution_batch_size_(GetMaxExecutionBatchSize(options_)),
692       process_batch_callback_(process_batch_callback),
693       schedulable_batch_callback_(schedulable_batch_callback) {
694   // Set the higher 32 bits of traceme_context_id_counter_ to be the creation
695   // time of the queue. This prevents the batches in different queues to have
696   // the same traceme_context_id_counter_.
697   traceme_context_id_counter_ = absl::GetCurrentTimeNanos() << 32;
698   // Create an initial, open batch.
699   if (options_.enable_lazy_split) {
700     task_handle_batches_.emplace_back(
701         new Batch<BatchInputTaskHandle<TaskType>>);
702   } else {
703     batches_.emplace_back(new Batch<TaskType>);
704   }
705 }
706 
707 template <typename TaskType>
~Queue()708 Queue<TaskType>::~Queue() {
709   mutex_lock l(mu_);
710   DCHECK(IsEmptyInternal());
711 
712   // Close the (empty) open batch, so its destructor doesn't block.
713   if (options_.enable_lazy_split) {
714     task_handle_batches_.back()->Close();
715   } else {
716     batches_.back()->Close();
717   }
718 }
719 
720 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)721 Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
722   if ((*task)->size() > options_.input_batch_size_limit) {
723     return errors::InvalidArgument("Task size ", (*task)->size(),
724                                    " is larger than maximum input batch size ",
725                                    options_.input_batch_size_limit);
726   }
727   if (options_.enable_lazy_split) {
728     return ScheduleWithLazySplit(std::move(task));
729   }
730   return ScheduleWithoutOrEagerSplit(std::move(task));
731 }
732 
733 template <typename TaskType>
ScheduleWithLazySplit(std::unique_ptr<TaskType> * task)734 Status Queue<TaskType>::ScheduleWithLazySplit(std::unique_ptr<TaskType>* task) {
735   profiler::TraceMe trace_me([task] {
736     return profiler::TraceMeEncode(
737         "ScheduleWithLazySplit",
738         {{"batching_input_task_size", (*task)->size()}});
739   });
740   // The max size to be enqueued.
741   const int max_execution_batch_size = options_.max_execution_batch_size;
742 
743   bool notify_of_schedulable_batch = false;
744   {
745     mutex_lock l(mu_);
746 
747     DCHECK(!closed_);
748 
749     if (BatchTaskExceedQueueCapacity((*task).get())) {
750       return errors::Unavailable(
751           "The batch scheduling queue to which this task was submitted is "
752           "full");
753     }
754     const int64 open_batch_capacity =
755         max_execution_batch_size - this->tail_batch_task_size();
756 
757     auto input_batch = std::make_shared<BatchInputTask<TaskType>>(
758         std::move(*task), open_batch_capacity, max_execution_batch_size,
759         options_.split_input_task_func);
760     std::vector<std::unique_ptr<BatchInputTaskHandle<TaskType>>> task_handles;
761 
762     input_batch->ToTaskHandles(&task_handles);
763 
764     for (int i = 0; i < task_handles.size(); ++i) {
765       if (task_handle_batches_.back()->size() + task_handles[i]->size() >
766           options_.max_execution_batch_size) {
767         StartNewBatch();
768       }
769       if (task_handle_batches_.back()->empty()) {
770         open_batch_start_time_micros_ = env_->NowMicros();
771       }
772       profiler::TraceMeProducer trace_me(
773           [&task_handles, i] {
774             return profiler::TraceMeEncode("ScheduleOutputTask",
775                                            {{"size", task_handles[i]->size()}});
776           },
777           profiler::ContextType::kSharedBatchScheduler,
778           task_handle_batches_.back()->traceme_context_id());
779 
780       task_handle_batches_.back()->AddTask(std::move(task_handles[i]));
781     }
782 
783     if (!schedulable_batch_) {
784       if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
785         schedulable_batch_ = true;
786         notify_of_schedulable_batch = true;
787       }
788     }
789   }
790   // TODO(b/194294263):
791   // Add unit tests to verify that `schedulable_batch_callback_` could be
792   // triggered when batches are scheduled.
793   if (notify_of_schedulable_batch) {
794     schedulable_batch_callback_();
795   }
796 
797   return Status::OK();
798 }
799 
800 // TODO(b/194294263):
801 // Merge `ScheduleWithoutOrEagerSplit` and `ScheduleWithLazySplit` into
802 // `Schedule`.
803 template <typename TaskType>
ScheduleWithoutOrEagerSplit(std::unique_ptr<TaskType> * task)804 Status Queue<TaskType>::ScheduleWithoutOrEagerSplit(
805     std::unique_ptr<TaskType>* task) {
806   const bool large_batch_splitting = options_.enable_large_batch_splitting;
807   profiler::TraceMe trace_me([task, large_batch_splitting] {
808     return profiler::TraceMeEncode(
809         large_batch_splitting ? "ScheduleWithEagerSplit"
810                               : "ScheduleWithoutSplit",
811         {{"batching_input_task_size", (*task)->size()}});
812   });
813 
814   bool notify_of_schedulable_batch = false;
815   {
816     mutex_lock l(mu_);
817 
818     DCHECK(!closed_);
819 
820     // TODO(b/161857471):
821     // Add test coverage when when concurrent incoming batches arrives and
822     // use up all queue capacity.
823     if (BatchTaskExceedQueueCapacity((*task).get())) {
824       return errors::Unavailable(
825           "The batch scheduling queue to which this task was submitted is "
826           "full");
827     }
828 
829     const int64_t open_batch_remaining_slot =
830         options_.max_execution_batch_size - batches_.back()->size();
831 
832     const int64_t input_task_size = (*task)->size();
833 
834     std::vector<std::unique_ptr<TaskType>> output_tasks;
835 
836     if (input_task_size <= open_batch_remaining_slot ||
837         !large_batch_splitting) {
838       // This is the fast path when input doesn't need to be split.
839       output_tasks.push_back(std::move(*task));
840     } else {
841       TF_RETURN_IF_ERROR(SplitInputBatchIntoSubtasks(task, &output_tasks));
842     }
843 
844     for (int i = 0; i < output_tasks.size(); ++i) {
845       if (batches_.back()->size() + output_tasks[i]->size() >
846           options_.max_execution_batch_size) {
847         StartNewBatch();
848       }
849       if (batches_.back()->empty()) {
850         open_batch_start_time_micros_ = env_->NowMicros();
851       }
852       profiler::TraceMeProducer trace_me(
853           [&output_tasks, i] {
854             return profiler::TraceMeEncode("ScheduleOutputTask",
855                                            {{"size", output_tasks[i]->size()}});
856           },
857           profiler::ContextType::kSharedBatchScheduler,
858           batches_.back()->traceme_context_id());
859       batches_.back()->AddTask(std::move(output_tasks[i]));
860     }
861 
862     if (!schedulable_batch_) {
863       if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
864         schedulable_batch_ = true;
865         notify_of_schedulable_batch = true;
866       }
867     }
868   }
869 
870   if (notify_of_schedulable_batch) {
871     schedulable_batch_callback_();
872   }
873 
874   return Status::OK();
875 }
876 
877 template <typename TaskType>
NumEnqueuedTasks()878 size_t Queue<TaskType>::NumEnqueuedTasks() const {
879   size_t num_enqueued_tasks = 0;
880   mutex_lock l(mu_);
881   if (options_.enable_lazy_split) {
882     for (const auto& batch : task_handle_batches_) {
883       num_enqueued_tasks += batch->num_tasks();
884     }
885     return num_enqueued_tasks;
886   }
887 
888   for (const auto& batch : batches_) {
889     num_enqueued_tasks += batch->num_tasks();
890   }
891   return num_enqueued_tasks;
892 }
893 
894 template <typename TaskType>
SchedulingCapacity()895 size_t Queue<TaskType>::SchedulingCapacity() const {
896   mutex_lock l(mu_);
897   return SchedulingCapacityInternal();
898 }
899 
900 template <typename TaskType>
SchedulingCapacityInternal()901 size_t Queue<TaskType>::SchedulingCapacityInternal() const {
902   const int64 num_new_batches_schedulable =
903       static_cast<int64>(options_.max_enqueued_batches) -
904       this->num_enqueued_batches();
905   const int64 execution_batch_size_limit = max_execution_batch_size();
906   const int64 open_batch_capacity =
907       execution_batch_size_limit - this->tail_batch_task_size();
908   // Note the returned value is guaranteed to be not negative, since
909   // enqueue operation could only happen if queue has enough capacity.
910   return (num_new_batches_schedulable * execution_batch_size_limit) +
911          open_batch_capacity;
912 }
913 
914 template <typename TaskType>
BatchTaskExceedQueueCapacity(TaskType * task)915 bool Queue<TaskType>::BatchTaskExceedQueueCapacity(TaskType* task) const {
916   // Queue creation requires that `enable_large_batch_splitting` is true
917   // when `enable_lazy_split` is true, so this covers both eager split and
918   // lazy split.
919   if (options_.enable_large_batch_splitting) {
920     return task->size() > SchedulingCapacityInternal();
921   }
922 
923   // NOTE, the capacity checking below is loose and is retained
924   // for backward compatibility that was broken due to the merge of no-split
925   // and eager split.
926   // There are existing clients/models that rely on the loose check
927   // and can get errors after the merge. Retaining the old behavior
928   // allows such models to continue to work.
929   //
930   // We need to revisit/remove this check after we fix model configs.
931   bool batch_task_exceed_queue_capacity = false;
932   if (batches_.back()->size() + task->size() >
933       options_.input_batch_size_limit) {
934     if (batches_.size() >= options_.max_enqueued_batches) {
935       batch_task_exceed_queue_capacity = true;
936     }
937   }
938   return batch_task_exceed_queue_capacity;
939 }
940 
941 template <typename TaskType>
942 std::unique_ptr<Batch<TaskType>>
ScheduleBatchWithEagerSplit()943 Queue<TaskType>::ScheduleBatchWithEagerSplit() {
944   // The batch to schedule, which we may populate below. (If left as nullptr,
945   // that means we are electing not to schedule a batch at this time.)
946   std::unique_ptr<Batch<TaskType>> batch_to_schedule;
947 
948   {
949     mutex_lock l(mu_);
950 
951     // Consider closing the open batch at this time, to schedule it.
952     if (batches_.size() == 1 && IsOpenBatchSchedulable()) {
953       StartNewBatch();
954     }
955 
956     if (batches_.size() >= 2) {
957       // There is at least one closed batch that is ready to be scheduled.
958       ++num_batches_being_processed_;
959       batch_to_schedule = std::move(batches_.front());
960       batches_.pop_front();
961     } else {
962       schedulable_batch_ = false;
963     }
964   }
965 
966   return batch_to_schedule;
967 }
968 
969 template <typename TaskType>
ScheduleBatch()970 std::unique_ptr<Batch<TaskType>> Queue<TaskType>::ScheduleBatch() {
971   if (!options_.enable_lazy_split) {
972     return ScheduleBatchWithEagerSplit();
973   }
974   // The batch to schedule, which we may populate below. (If left as nullptr,
975   // that means we are electing not to schedule a batch at this time.)
976   std::unique_ptr<Batch<BatchInputTaskHandle<TaskType>>>
977       task_handles_to_schedule;
978 
979   {
980     mutex_lock l(mu_);
981 
982     // Consider closing the open batch at this time, to schedule it.
983     if (task_handle_batches_.size() == 1 && IsOpenBatchSchedulable()) {
984       StartNewBatch();
985     }
986 
987     if (task_handle_batches_.size() >= 2) {
988       // There is at least one closed batch that is ready to be scheduled.
989       ++num_batches_being_processed_;
990       task_handles_to_schedule = std::move(task_handle_batches_.front());
991       task_handle_batches_.pop_front();
992     } else {
993       schedulable_batch_ = false;
994     }
995   }
996 
997   std::unique_ptr<Batch<TaskType>> batch_to_schedule;
998   if (task_handles_to_schedule != nullptr) {
999     batch_to_schedule = std::make_unique<Batch<TaskType>>();
1000     std::vector<std::unique_ptr<BatchInputTaskHandle<TaskType>>> task_handles =
1001         task_handles_to_schedule->RemoveAllTasks();
1002 
1003     // TODO(b/194294263):
1004     // Handle the batch-kernel callback properly when lazy split returns error.
1005     for (int i = 0; i < task_handles.size(); i++) {
1006       batch_to_schedule->AddTask(std::move(task_handles[i]->GetSplitTask()));
1007     }
1008     batch_to_schedule->Close();
1009   }
1010   return batch_to_schedule;
1011 }
1012 
1013 template <typename TaskType>
ProcessBatch(std::unique_ptr<Batch<TaskType>> batch)1014 void Queue<TaskType>::ProcessBatch(std::unique_ptr<Batch<TaskType>> batch) {
1015   profiler::TraceMeConsumer trace_me(
1016       [&] {
1017         return profiler::TraceMeEncode(
1018             "ProcessBatch", {{"batch_size_before_padding", batch->size()},
1019                              {"_r", 2} /*root_event*/});
1020       },
1021       profiler::ContextType::kSharedBatchScheduler,
1022       batch->traceme_context_id());
1023   process_batch_callback_(std::move(batch));
1024 
1025   {
1026     mutex_lock l(mu_);
1027     --num_batches_being_processed_;
1028     if (empty_notification_ != nullptr && IsEmptyInternal()) {
1029       empty_notification_->Notify();
1030     }
1031   }
1032 }
1033 
1034 template <typename TaskType>
IsEmpty()1035 bool Queue<TaskType>::IsEmpty() const {
1036   mutex_lock l(mu_);
1037   return IsEmptyInternal();
1038 }
1039 
1040 template <typename TaskType>
CloseAndWaitUntilEmpty()1041 void Queue<TaskType>::CloseAndWaitUntilEmpty() {
1042   Notification empty;
1043   {
1044     mutex_lock l(mu_);
1045     closed_ = true;
1046     if (IsEmptyInternal()) {
1047       empty.Notify();
1048     } else {
1049       // Arrange for ProcessBatch() to notify when the queue becomes empty.
1050       empty_notification_ = &empty;
1051     }
1052   }
1053   empty.WaitForNotification();
1054 }
1055 
1056 template <typename TaskType>
IsEmptyInternal()1057 bool Queue<TaskType>::IsEmptyInternal() const {
1058   if (options_.enable_lazy_split) {
1059     return num_batches_being_processed_ == 0 &&
1060            task_handle_batches_.size() == 1 &&
1061            task_handle_batches_.back()->empty();
1062   }
1063   return num_batches_being_processed_ == 0 && batches_.size() == 1 &&
1064          batches_.back()->empty();
1065 }
1066 
1067 template <typename TaskType>
StartNewBatch()1068 void Queue<TaskType>::StartNewBatch() {
1069   if (options_.enable_lazy_split) {
1070     task_handle_batches_.back()->Close();
1071     task_handle_batches_.emplace_back(new Batch<BatchInputTaskHandle<TaskType>>(
1072         ++traceme_context_id_counter_));
1073     return;
1074   }
1075   batches_.back()->Close();
1076   batches_.emplace_back(new Batch<TaskType>(++traceme_context_id_counter_));
1077 }
1078 
1079 template <typename TaskType>
SplitInputBatchIntoSubtasks(std::unique_ptr<TaskType> * input_task,std::vector<std::unique_ptr<TaskType>> * output_tasks)1080 Status Queue<TaskType>::SplitInputBatchIntoSubtasks(
1081     std::unique_ptr<TaskType>* input_task,
1082     std::vector<std::unique_ptr<TaskType>>* output_tasks) {
1083   const int open_batch_remaining_slot =
1084       max_execution_batch_size() - this->tail_batch_task_size();
1085   return options_.split_input_task_func(
1086       std::move(input_task), open_batch_remaining_slot,
1087       max_execution_batch_size(), std::move(output_tasks));
1088 }
1089 
1090 template <typename TaskType>
IsOpenBatchSchedulableAfterEagerSplit()1091 bool Queue<TaskType>::IsOpenBatchSchedulableAfterEagerSplit() const {
1092   Batch<TaskType>* open_batch = batches_.back().get();
1093   if (open_batch->empty()) {
1094     return false;
1095   }
1096   return closed_ || open_batch->size() >= max_execution_batch_size() ||
1097          env_->NowMicros() >=
1098              open_batch_start_time_micros_ + options_.batch_timeout_micros;
1099 }
1100 
1101 template <typename TaskType>
IsOpenBatchSchedulable()1102 bool Queue<TaskType>::IsOpenBatchSchedulable() const {
1103   if (!options_.enable_lazy_split) {
1104     return IsOpenBatchSchedulableAfterEagerSplit();
1105   }
1106   Batch<BatchInputTaskHandle<TaskType>>* open_batch =
1107       task_handle_batches_.back().get();
1108   if (open_batch->empty()) {
1109     return false;
1110   }
1111   return closed_ || open_batch->size() >= max_execution_batch_size() ||
1112          env_->NowMicros() >=
1113              open_batch_start_time_micros_ + options_.batch_timeout_micros;
1114 }
1115 
1116 template <typename TaskType>
tail_batch_task_size()1117 size_t Queue<TaskType>::tail_batch_task_size() const {
1118   if (options_.enable_lazy_split) {
1119     return task_handle_batches_.back()->size();
1120   }
1121 
1122   return batches_.back()->size();
1123 }
1124 
1125 template <typename TaskType>
num_enqueued_batches()1126 int64 Queue<TaskType>::num_enqueued_batches() const {
1127   if (options_.enable_lazy_split) {
1128     return task_handle_batches_.size();
1129   }
1130   return batches_.size();
1131 }
1132 
1133 template <typename TaskType>
QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,Queue<TaskType> * queue)1134 QueueHandle<TaskType>::QueueHandle(
1135     std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
1136     Queue<TaskType>* queue)
1137     : scheduler_(scheduler), queue_(queue) {}
1138 
1139 template <typename TaskType>
~QueueHandle()1140 QueueHandle<TaskType>::~QueueHandle() {
1141   queue_->CloseAndWaitUntilEmpty();
1142 }
1143 
1144 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)1145 Status QueueHandle<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
1146   return queue_->Schedule(task);
1147 }
1148 
1149 template <typename TaskType>
NumEnqueuedTasks()1150 size_t QueueHandle<TaskType>::NumEnqueuedTasks() const {
1151   return queue_->NumEnqueuedTasks();
1152 }
1153 
1154 template <typename TaskType>
SchedulingCapacity()1155 size_t QueueHandle<TaskType>::SchedulingCapacity() const {
1156   return queue_->SchedulingCapacity();
1157 }
1158 
1159 }  // namespace internal
1160 
1161 }  // namespace serving
1162 }  // namespace tensorflow
1163 
1164 #endif  // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
1165