1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
19 #include <stddef.h>
21 #include <deque>
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
29 #include "absl/time/clock.h"
30 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
31 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
32 #include "tensorflow/core/lib/core/errors.h"
33 #include "tensorflow/core/lib/core/status.h"
34 #include "tensorflow/core/lib/strings/strcat.h"
35 #include "tensorflow/core/platform/byte_order.h"
36 #include "tensorflow/core/platform/cpu_info.h"
37 #include "tensorflow/core/platform/env.h"
38 #include "tensorflow/core/platform/errors.h"
39 #include "tensorflow/core/platform/thread_annotations.h"
40 #include "tensorflow/core/platform/types.h"
41 #include "tensorflow/core/profiler/lib/connected_traceme.h"
42 #include "tensorflow/core/profiler/lib/traceme.h"
43 #include "tensorflow/core/profiler/lib/traceme_encode.h"
45 namespace tensorflow {
46 namespace serving {
47 namespace internal {
48 template <typename TaskType>
49 class Queue;
50 } // namespace internal
51 } // namespace serving
52 } // namespace tensorflow
54 namespace tensorflow {
55 namespace serving {
57 // A batch scheduler for server instances that service multiple request types
58 // (e.g. multiple machine-learned models, or multiple versions of a model served
59 // concurrently), or even multiple distinct tasks for a given request. The
60 // scheduler multiplexes batches of different kinds of tasks onto a fixed-size
61 // thread pool (each batch contains tasks of a single type), in a carefully
62 // controlled manner. A common configuration is to set the number of threads
63 // equal to the number of hardware accelerator units, in which case the
64 // scheduler takes care of multiplexing the task types onto the shared hardware,
65 // in a manner that is both fair and efficient.
66 //
67 // Semantically, SharedBatchScheduler behaves like having N instances of
68 // BasicBatchScheduler (see basic_batch_scheduler.h), one per task type. The
69 // difference is that under the covers there is a single shared thread pool,
70 // instead of N independent ones, with their sharing deliberately coordinated.
71 //
72 // SharedBatchScheduler does not implement the BatchScheduler API; rather, it
73 // presents an abstraction of "queues", where each queue corresponds to one type
74 // of task. Tasks submitted to a given queue are placed in their own batches,
75 // and cannot be mixed with other tasks. Queues can be added and deleted
76 // dynamically, to accommodate e.g. versions of a model being brought up and
77 // down over the lifetime of a server.
78 //
79 // The batch thread pool round-robins through the queues, running one batch
80 // from a queue and then moving to the next queue. Each queue behaves like a
81 // BasicBatchScheduler instance, in the sense that it has maximum batch size and
82 // timeout parameters, which govern when a batch is eligible to be processed.
83 //
84 // Each queue is independently configured with a maximum size (in terms of the
85 // maximum number of batches worth of enqueued tasks). For online serving, it is
86 // recommended that the queue sizes be configured such that the sum of the sizes
87 // of the active queues roughly equal the number of batch threads. (The idea is
88 // that if all threads become available at roughly the same time, there will be
89 // enough enqueued work for them to take on, but no more.)
90 //
91 // If queue sizes are configured in the manner suggested above, the maximum time
92 // a task can spend in a queue before being placed in a batch and assigned to a
93 // thread for processing, is the greater of:
94 // - the maximum time to process one batch of tasks from any active queue
95 // - the configured timeout parameter for the task's queue (which can be 0)
96 //
97 // For bulk processing jobs and throughput-oriented benchmarks, you may want to
98 // set the maximum queue size to a large value.
99 //
100 // TODO(b/26539183): Support queue servicing policies other than round-robin.
101 // E.g. let each queue specify a "share" (an int >= 1), so e.g. with queues A
102 // and B having shares 1 and 2 respectively, the servicing pattern is ABBABB...
103 //
104 //
106 //
107 template <typename TaskType>
108 class SharedBatchScheduler
109 : public std::enable_shared_from_this<SharedBatchScheduler<TaskType>> {
110 public:
111 // TODO(b/25089730): Tune defaults based on best practices as they develop.
112 struct Options {
113 // The name to use for the pool of batch threads.
114 string thread_pool_name = {"batch_threads"};
116 // The number of threads to use to process batches.
117 // Must be >= 1, and should be tuned carefully.
118 int num_batch_threads = port::MaxParallelism();
120 // The environment to use.
121 // (Typically only overridden by test code.)
122 Env* env = Env::Default();
123 };
124 // Ownership is shared between the caller of Create() and any queues created
125 // via AddQueue().
126 static Status Create(
127 const Options& options,
128 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler);
130 ~SharedBatchScheduler();
132 // Adds a queue to which tasks may be submitted. The returned queue implements
133 // the BatchScheduler API. Each queue has its own set of scheduling options,
134 // and its own callback to process batches of tasks submitted to the queue.
135 //
136 // The returned queue's destructor blocks until all tasks submitted to it have
137 // been processed.
138 struct QueueOptions {
139 // The size limit of an input batch to the queue.
140 //
141 // If `enable_large_batch_splitting` is True, 'input_batch_size_limit'
142 // should be greater or equal than `max_execution_batch_size`; otherwise
143 // `input_batch_size_limit` should be equal to `max_execution_batch_size`.
144 size_t input_batch_size_limit = 1000;
146 // If a task has been enqueued for this amount of time (in microseconds),
147 // and a thread is available, the scheduler will immediately form a batch
148 // from enqueued tasks and assign the batch to the thread for processing,
149 // even if the batch's size is below 'input_batch_size_limit'.
150 //
151 // This parameter offers a way to bound queue latency, so that a task isn't
152 // stuck in the queue indefinitely waiting for enough tasks to arrive to
153 // make a full batch. (The latency bound is given in the class documentation
154 // above.)
155 //
156 // The goal is to smooth out batch sizes under low request rates, and thus
157 // avoid latency spikes.
158 int64 batch_timeout_micros = 0;
160 // The maximum allowable number of enqueued (accepted by Schedule() but
161 // not yet being processed on a batch thread) tasks in terms of batches.
162 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
163 // See the class documentation above for guidelines on how to tune this
164 // parameter.
165 size_t max_enqueued_batches = 10;
167 // If true, queue implementation would split one input batch task into
168 // subtasks (as specified by `split_input_task_func` below) and fit subtasks
169 // into different batches.
170 //
171 // For usage of `split_input_task_func`, please see its comment.
172 bool enable_large_batch_splitting = false;
174 // `input_task`: a unit of task to be split.
175 // `first_output_task_size`: task size of first output.
176 // `max_execution_batch_size`: Maximum size of each batch.
177 // `output_tasks`: A list of output tasks after split.
178 //
179 // REQUIRED:
180 // 1) All `output_tasks` should be non-empty tasks.
181 // 2) Sizes of `output_tasks` add up to size of `input_task`.
182 //
183 // NOTE:
184 // Instantiations of `TaskType` may vary, so it's up to caller to define
185 // how (e.g., which members to access) to split input tasks.
186 std::function<Status(std::unique_ptr<TaskType>* input_task,
187 int first_output_task_size, int input_batch_size_limit,
188 std::vector<std::unique_ptr<TaskType>>* output_tasks)>
189 split_input_task_func;
191 // The maximum size of each enqueued batch (i.e., in `batches_`).
192 //
193 // The scheduler may form batches of any size between 1 and this number
194 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
195 // submit batches whose size is in a small set of allowed sizes, that can be
196 // done by adding padding in the process-batch callback.
197 size_t max_execution_batch_size = 1000;
198 };
199 Status AddQueue(const QueueOptions& options,
200 std::function<void(std::unique_ptr<Batch<TaskType>>)>
201 process_batch_callback,
202 std::unique_ptr<BatchScheduler<TaskType>>* queue);
204 private:
205 explicit SharedBatchScheduler(const Options& options);
207 // The code executed in 'batch_threads_'. Obtains a batch to process from the
208 // queue pointed to by 'next_queue_to_schedule_', and processes it. If that
209 // queue declines to provide a batch to process, moves onto the next queue. If
210 // no queues provide a batch to process, just sleeps briefly and exits.
211 void ThreadLogic();
213 const Options options_;
215 mutex mu_;
217 // A list of queues. (We use std::list instead of std::vector to ensure that
218 // iterators are not invalidated by adding/removing elements. It also offers
219 // efficient removal of elements from the middle.)
220 using QueueList = std::list<std::unique_ptr<internal::Queue<TaskType>>>;
222 // All "active" queues, i.e. ones that either:
223 // - have not been removed, or
224 // - have been removed but are not yet empty.
225 QueueList queues_ TF_GUARDED_BY(mu_);
227 // An iterator over 'queues_', pointing to the queue from which the next
228 // available batch thread should grab work.
229 typename QueueList::iterator next_queue_to_schedule_ TF_GUARDED_BY(mu_);
231 // Used by idle batch threads to wait for work to enter the system. Notified
232 // whenever a batch becomes schedulable.
233 condition_variable schedulable_batch_cv_;
235 // Threads that process batches obtained from the queues.
236 std::vector<std::unique_ptr<PeriodicFunction>> batch_threads_;
238 TF_DISALLOW_COPY_AND_ASSIGN(SharedBatchScheduler);
239 };
241 //////////
242 // Implementation details follow. API users need not read.
244 namespace internal {
246 // A task queue for SharedBatchScheduler. Accepts tasks and accumulates them
247 // into batches, and dispenses those batches to be processed via a "pull"
248 // interface. The queue's behavior is governed by maximum batch size, timeout
249 // and maximum queue length parameters; see their documentation in
250 // SharedBatchScheduler.
251 //
252 // The queue is implemented as a deque of batches, with these invariants:
253 // - The number of batches is between 1 and 'options_.max_enqueued_batches'.
254 // - The back-most batch is open; the rest are closed.
255 //
256 // Submitted tasks are added to the open batch. If that batch doesn't have room
257 // but the queue isn't full, then that batch is closed and a new open batch is
258 // started.
259 //
260 // Batch pull requests are handled by dequeuing the front-most batch if it is
261 // closed. If the front-most batch is open (i.e. the queue contains only one
262 // batch) and has reached the timeout, it is immediately closed and returned;
263 // otherwise no batch is returned for the request.
264 template <typename TaskType>
265 class Queue {
266 public:
267 using ProcessBatchCallback =
268 std::function<void(std::unique_ptr<Batch<TaskType>>)>;
269 using SchedulableBatchCallback = std::function<void()>;
270 using SplitInputTaskIntoSubtasksCallback = std::function<Status(
271 std::unique_ptr<TaskType>* input_task, int open_batch_remaining_slot,
272 int max_execution_batch_size,
273 std::vector<std::unique_ptr<TaskType>>* output_tasks)>;
274 Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
275 Env* env, ProcessBatchCallback process_batch_callback,
276 SchedulableBatchCallback schedulable_batch_callback);
278 // Illegal to destruct unless the queue is empty.
279 ~Queue();
281 // Submits a task to the queue, with the same semantics as
282 // BatchScheduler::Schedule().
283 Status Schedule(std::unique_ptr<TaskType>* task);
285 // 'ScheduleWithoutSplit'.
286 Status ScheduleWithoutSplit(std::unique_ptr<TaskType>* task);
288 // 'ScheduleWithSplit'
289 Status ScheduleWithSplit(std::unique_ptr<TaskType>* task);
291 // Returns the number of enqueued tasks, with the same semantics as
292 // BatchScheduler::NumEnqueuedTasks().
293 size_t NumEnqueuedTasks() const;
295 // Returns the queue capacity, with the same semantics as
296 // BatchScheduler::SchedulingCapacity().
297 size_t SchedulingCapacity() const;
299 // Returns the maximum allowed size of tasks submitted to the queue.
max_task_size()300 size_t max_task_size() const { return options_.input_batch_size_limit; }
302 // Returns the maximum allowed size of tasks to be enqueued.
303 // Returned value would be less than or equal to the maximum allowed input
304 // size that's provided by caller of batch scheduler.
max_execution_batch_size()305 size_t max_execution_batch_size() const {
306 if (options_.enable_large_batch_splitting) {
307 return options_.max_execution_batch_size;
308 } else {
309 return options_.input_batch_size_limit;
310 }
311 }
313 // Called by a thread that is ready to process a batch, to request one from
314 // this queue. Either returns a batch that is ready to be processed, or
315 // nullptr if the queue declines to schedule a batch at this time. If it
316 // returns a batch, the batch is guaranteed to be closed.
317 std::unique_ptr<Batch<TaskType>> ScheduleBatch();
319 // Processes a batch that has been returned earlier by ScheduleBatch().
320 void ProcessBatch(std::unique_ptr<Batch<TaskType>> batch);
322 // Determines whether the queue is empty, i.e. has no tasks waiting or being
323 // processed.
324 bool IsEmpty() const;
326 // Marks the queue closed, and waits until it is empty.
327 void CloseAndWaitUntilEmpty();
closed()329 bool closed() const TF_NO_THREAD_SAFETY_ANALYSIS { return closed_.load(); }
331 private:
332 // Same as IsEmpty(), but assumes the caller already holds a lock on 'mu_'.
333 bool IsEmptyInternal() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
335 // Closes the open batch residing at the back of 'batches_', and inserts a
336 // fresh open batch behind it.
337 void StartNewBatch() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
339 // Split `input task` into `output_tasks` according to 'task_sizes'.
340 Status SplitInputBatchIntoSubtasks(
341 std::unique_ptr<TaskType>* input_task,
342 std::vector<std::unique_ptr<TaskType>>* output_tasks)
345 // Determines whether the open batch residing at the back of 'batches_' is
346 // currently schedulable.
347 bool IsOpenBatchSchedulable() const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
349 const typename SharedBatchScheduler<TaskType>::QueueOptions options_;
351 // The environment to use.
352 Env* env_;
354 // A callback invoked to processes a batch of work units. Always invoked
355 // from a batch thread.
356 ProcessBatchCallback process_batch_callback_;
358 // A callback invoked to notify the scheduler that a new batch has become
359 // schedulable.
360 SchedulableBatchCallback schedulable_batch_callback_;
362 mutable mutex mu_;
364 // Whether this queue can accept new tasks. This variable is monotonic: it
365 // starts as false, and then at some point gets set to true and remains true
366 // for the duration of this object's life.
TF_GUARDED_BY(mu_)367 std::atomic<bool> closed_ TF_GUARDED_BY(mu_){false};
369 // The enqueued batches. See the invariants in the class comments above.
370 std::deque<std::unique_ptr<Batch<TaskType>>> batches_ TF_GUARDED_BY(mu_);
372 // The counter of the TraceMe context ids.
373 uint64 traceme_context_id_counter_ TF_GUARDED_BY(mu_) = 0;
375 // The time at which the first task was added to the open (back-most) batch
376 // in 'batches_'. Valid iff that batch contains at least one task.
377 uint64 open_batch_start_time_micros_ TF_GUARDED_BY(mu_);
379 // Whether this queue contains a batch that is eligible to be scheduled.
380 // Used to keep track of when to call 'schedulable_batch_callback_'.
381 bool schedulable_batch_ TF_GUARDED_BY(mu_) = false;
383 // The number of batches currently being processed by batch threads.
384 // Incremented in ScheduleBatch() and decremented in ProcessBatch().
385 int num_batches_being_processed_ TF_GUARDED_BY(mu_) = 0;
387 // Used by CloseAndWaitUntilEmpty() to wait until the queue is empty, for
388 // the case in which the queue is not empty when CloseAndWaitUntilEmpty()
389 // starts. When ProcessBatch() dequeues the last batch and makes the queue
390 // empty, if 'empty_notification_' is non-null it calls
391 // 'empty_notification_->Notify()'.
392 Notification* empty_notification_ TF_GUARDED_BY(mu_) = nullptr;
395 };
397 // A RAII-style object that points to a Queue and implements
398 // the BatchScheduler API. To be handed out to clients who call AddQueue().
399 template <typename TaskType>
400 class QueueHandle : public BatchScheduler<TaskType> {
401 public:
402 QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
403 Queue<TaskType>* queue);
404 ~QueueHandle() override;
406 Status Schedule(std::unique_ptr<TaskType>* task) override;
407 size_t NumEnqueuedTasks() const override;
408 size_t SchedulingCapacity() const override;
max_task_size()410 size_t max_task_size() const override { return queue_->max_task_size(); }
412 private:
413 // The scheduler that owns 'queue_'.
414 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler_;
416 // The queue this handle wraps. Owned by 'scheduler_', which keeps it alive at
417 // least until this class's destructor closes it.
418 Queue<TaskType>* queue_;
421 };
423 } // namespace internal
425 template <typename TaskType>
Create(const Options & options,std::shared_ptr<SharedBatchScheduler<TaskType>> * scheduler)426 Status SharedBatchScheduler<TaskType>::Create(
427 const Options& options,
428 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler) {
429 if (options.num_batch_threads < 1) {
430 return errors::InvalidArgument("num_batch_threads must be positive; was ",
431 options.num_batch_threads);
432 }
433 scheduler->reset(new SharedBatchScheduler<TaskType>(options));
434 return Status::OK();
435 }
437 template <typename TaskType>
~SharedBatchScheduler()438 SharedBatchScheduler<TaskType>::~SharedBatchScheduler() {
439 // Wait until the batch threads finish clearing out and deleting the closed
440 // queues.
441 for (;;) {
442 {
443 mutex_lock l(mu_);
444 if (queues_.empty()) {
445 break;
446 }
447 }
448 const int64 kSleepTimeMicros = 100;
449 options_.env->SleepForMicroseconds(kSleepTimeMicros);
450 }
451 // Delete the batch threads before allowing state the threads may access (e.g.
452 // 'mu_') to be deleted.
453 batch_threads_.clear();
454 }
456 template <typename TaskType>
AddQueue(const QueueOptions & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BatchScheduler<TaskType>> * queue)457 Status SharedBatchScheduler<TaskType>::AddQueue(
458 const QueueOptions& options,
459 std::function<void(std::unique_ptr<Batch<TaskType>>)>
460 process_batch_callback,
461 std::unique_ptr<BatchScheduler<TaskType>>* queue) {
462 if (options.input_batch_size_limit == 0) {
463 return errors::InvalidArgument(
464 "input_batch_size_limit must be positive; was ",
465 options.input_batch_size_limit);
466 }
467 if (options.batch_timeout_micros < 0) {
468 return errors::InvalidArgument(
469 "batch_timeout_micros must be non-negative; was ",
470 options.batch_timeout_micros);
471 }
472 if (options.max_enqueued_batches < 0) {
473 return errors::InvalidArgument(
474 "max_enqueued_batches must be non-negative; was ",
475 options.max_enqueued_batches);
476 }
478 if (options.enable_large_batch_splitting &&
479 options.split_input_task_func == nullptr) {
480 return errors::InvalidArgument(
481 "split_input_task_func must be specified when split_input_task is "
482 "true: ",
483 options.enable_large_batch_splitting);
484 }
486 if (options.enable_large_batch_splitting &&
487 (options.input_batch_size_limit < options.max_execution_batch_size)) {
488 return errors::InvalidArgument(
489 "When enable_large_batch_splitting is true, input_batch_size_limit "
490 "must be "
491 "greater than or equal to max_execution_batch_size.",
492 options.enable_large_batch_splitting, options.input_batch_size_limit,
493 options.max_execution_batch_size);
494 }
496 auto schedulable_batch_callback = [this] {
497 mutex_lock l(mu_);
498 schedulable_batch_cv_.notify_one();
499 };
500 auto internal_queue =
501 std::unique_ptr<internal::Queue<TaskType>>(new internal::Queue<TaskType>(
502 options, options_.env, process_batch_callback,
503 schedulable_batch_callback));
504 auto handle = std::unique_ptr<BatchScheduler<TaskType>>(
505 new internal::QueueHandle<TaskType>(this->shared_from_this(),
506 internal_queue.get()));
507 {
508 mutex_lock l(mu_);
509 queues_.push_back(std::move(internal_queue));
510 if (next_queue_to_schedule_ == queues_.end()) {
511 next_queue_to_schedule_ = queues_.begin();
512 }
513 }
514 *queue = std::move(handle);
515 return Status::OK();
516 }
518 template <typename TaskType>
SharedBatchScheduler(const Options & options)519 SharedBatchScheduler<TaskType>::SharedBatchScheduler(const Options& options)
520 : options_(options), next_queue_to_schedule_(queues_.end()) {
521 // Kick off the batch threads.
522 PeriodicFunction::Options periodic_fn_options;
523 periodic_fn_options.thread_name_prefix =
524 strings::StrCat(options.thread_pool_name, "_");
525 for (int i = 0; i < options.num_batch_threads; ++i) {
526 std::unique_ptr<PeriodicFunction> thread(new PeriodicFunction(
527 [this] { this->ThreadLogic(); },
528 0 /* function invocation interval time */, periodic_fn_options));
529 batch_threads_.push_back(std::move(thread));
530 }
531 }
533 template <typename TaskType>
ThreadLogic()534 void SharedBatchScheduler<TaskType>::ThreadLogic() {
535 // A batch to process next (or nullptr if no work to do).
536 std::unique_ptr<Batch<TaskType>> batch_to_process;
537 // The queue with which 'batch_to_process' is associated.
538 internal::Queue<TaskType>* queue_for_batch = nullptr;
539 {
540 mutex_lock l(mu_);
542 const int num_queues = queues_.size();
543 for (int num_queues_tried = 0;
544 batch_to_process == nullptr && num_queues_tried < num_queues;
545 ++num_queues_tried) {
546 DCHECK(next_queue_to_schedule_ != queues_.end());
548 // If a closed queue responds to ScheduleBatch() with nullptr, the queue
549 // will never yield any further batches so we can drop it. To avoid a
550 // race, we take a snapshot of the queue's closedness state *before*
551 // calling ScheduleBatch().
552 const bool queue_closed = (*next_queue_to_schedule_)->closed();
554 // Ask '*next_queue_to_schedule_' if it wants us to process a batch.
555 batch_to_process = (*next_queue_to_schedule_)->ScheduleBatch();
556 if (batch_to_process != nullptr) {
557 queue_for_batch = next_queue_to_schedule_->get();
558 }
560 // Advance 'next_queue_to_schedule_'.
561 if (queue_closed && (*next_queue_to_schedule_)->IsEmpty() &&
562 batch_to_process == nullptr) {
563 // We've encountered a closed queue with no work to do. Drop it.
564 DCHECK_NE(queue_for_batch, next_queue_to_schedule_->get());
565 next_queue_to_schedule_ = queues_.erase(next_queue_to_schedule_);
566 } else {
567 ++next_queue_to_schedule_;
568 }
569 if (next_queue_to_schedule_ == queues_.end() && !queues_.empty()) {
570 // We've hit the end. Wrap to the first queue.
571 next_queue_to_schedule_ = queues_.begin();
572 }
573 }
575 if (batch_to_process == nullptr) {
576 // We couldn't find any work to do. Wait until a new batch becomes
577 // schedulable, or some time has elapsed, before checking again.
578 const int64 kTimeoutMillis = 1; // The smallest accepted granule of time.
579 WaitForMilliseconds(&l, &schedulable_batch_cv_, kTimeoutMillis);
580 return;
581 }
582 }
584 queue_for_batch->ProcessBatch(std::move(batch_to_process));
585 }
587 namespace internal {
589 template <typename TaskType>
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions & options,Env * env,ProcessBatchCallback process_batch_callback,SchedulableBatchCallback schedulable_batch_callback)590 Queue<TaskType>::Queue(
591 const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
592 Env* env, ProcessBatchCallback process_batch_callback,
593 SchedulableBatchCallback schedulable_batch_callback)
594 : options_(options),
595 env_(env),
596 process_batch_callback_(process_batch_callback),
597 schedulable_batch_callback_(schedulable_batch_callback) {
598 // Set the higher 32 bits of traceme_context_id_counter_ to be the creation
599 // time of the queue. This prevents the batches in different queues to have
600 // the same traceme_context_id_counter_.
601 traceme_context_id_counter_ = absl::GetCurrentTimeNanos() << 32;
602 // Create an initial, open batch.
603 batches_.emplace_back(new Batch<TaskType>);
604 }
606 template <typename TaskType>
~Queue()607 Queue<TaskType>::~Queue() {
608 mutex_lock l(mu_);
609 DCHECK(IsEmptyInternal());
611 // Close the (empty) open batch, so its destructor doesn't block.
612 batches_.back()->Close();
613 }
615 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)616 Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
617 if (options_.enable_large_batch_splitting) {
618 return ScheduleWithSplit(std::move(task));
619 }
620 return ScheduleWithoutSplit(std::move(task));
621 }
623 template <typename TaskType>
ScheduleWithoutSplit(std::unique_ptr<TaskType> * task)624 Status Queue<TaskType>::ScheduleWithoutSplit(std::unique_ptr<TaskType>* task) {
625 if ((*task)->size() > options_.input_batch_size_limit) {
626 return errors::InvalidArgument("Task size ", (*task)->size(),
627 " is larger than maximum input batch size ",
628 options_.input_batch_size_limit);
629 }
631 bool notify_of_schedulable_batch = false;
632 {
633 mutex_lock l(mu_);
635 DCHECK(!closed_);
637 if (batches_.back()->size() + (*task)->size() >
638 options_.input_batch_size_limit) {
639 if (batches_.size() >= options_.max_enqueued_batches) {
640 return errors::Unavailable(
641 "The batch scheduling queue to which this task was submitted is "
642 "full");
643 }
644 StartNewBatch();
645 }
646 if (batches_.back()->empty()) {
647 open_batch_start_time_micros_ = env_->NowMicros();
648 }
649 profiler::TraceMeProducer trace_me(
650 [task] {
651 return profiler::TraceMeEncode(
652 "ScheduleWithoutSplit",
653 {{"batching_input_task_size", (*task)->size()}});
654 },
655 profiler::ContextType::kSharedBatchScheduler,
656 batches_.back()->traceme_context_id());
657 batches_.back()->AddTask(std::move(*task));
659 if (!schedulable_batch_) {
660 if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
661 schedulable_batch_ = true;
662 notify_of_schedulable_batch = true;
663 }
664 }
665 }
667 if (notify_of_schedulable_batch) {
668 schedulable_batch_callback_();
669 }
671 return Status::OK();
672 }
674 // TODO(b/154140947):
675 // Merge `ScheduleWithSplit` and `ScheduleWithoutSplit` into `Schedule`.
676 // Two variants are created so original path (ScheduleWithoutSplit) is kept as
677 // it is.
678 template <typename TaskType>
ScheduleWithSplit(std::unique_ptr<TaskType> * task)679 Status Queue<TaskType>::ScheduleWithSplit(std::unique_ptr<TaskType>* task) {
680 profiler::TraceMe trace_me([task] {
681 return profiler::TraceMeEncode(
682 "ScheduleWithSplit", {{"batching_input_task_size", (*task)->size()}});
683 });
684 if ((*task)->size() > options_.input_batch_size_limit) {
685 return errors::InvalidArgument("Task size ", (*task)->size(),
686 " is larger than maximum input batch size ",
687 options_.input_batch_size_limit);
688 }
690 // The max size to be enqueued.
691 const int max_execution_batch_size = options_.max_execution_batch_size;
693 bool notify_of_schedulable_batch = false;
694 {
695 mutex_lock l(mu_);
697 DCHECK(!closed_);
699 const int num_new_batches_schedulable =
700 options_.max_enqueued_batches - batches_.size();
701 const int open_batch_capacity =
702 max_execution_batch_size - batches_.back()->size();
703 const int scheduling_capacity =
704 (num_new_batches_schedulable * max_execution_batch_size) +
705 open_batch_capacity;
707 // The scenario when concurrent incoming batches arrives and use up all
708 // queue capacity isn't covered by unit test.
709 // The coverage boils down to sepcify "function library" in a way that,
710 // one batch task can synchronize with another task, and then two tasks
711 // run concurrently. An integration test might be a better fit.
712 if ((*task)->size() > scheduling_capacity) {
713 return errors::Unavailable(
714 "The batch scheduling queue to which this task was submitted is "
715 "full");
716 }
718 const int64 open_batch_remaining_slot =
719 max_execution_batch_size - batches_.back()->size();
721 const int64 input_task_size = (*task)->size();
723 std::vector<std::unique_ptr<TaskType>> output_tasks;
725 if (input_task_size <= open_batch_remaining_slot) {
726 // This is the fast path when input doesn't need to be split.
727 output_tasks.push_back(std::move(*task));
728 } else {
729 TF_RETURN_IF_ERROR(SplitInputBatchIntoSubtasks(task, &output_tasks));
730 }
732 for (int i = 0; i < output_tasks.size(); ++i) {
733 if (batches_.back()->size() + output_tasks[i]->size() >
734 options_.max_execution_batch_size) {
735 StartNewBatch();
736 }
737 if (batches_.back()->empty()) {
738 open_batch_start_time_micros_ = env_->NowMicros();
739 }
740 profiler::TraceMeProducer trace_me(
741 [&output_tasks, i] {
742 return profiler::TraceMeEncode("ScheduleOutputTask",
743 {{"size", output_tasks[i]->size()}});
744 },
745 profiler::ContextType::kSharedBatchScheduler,
746 batches_.back()->traceme_context_id());
747 batches_.back()->AddTask(std::move(output_tasks[i]));
748 }
750 if (!schedulable_batch_) {
751 if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
752 schedulable_batch_ = true;
753 notify_of_schedulable_batch = true;
754 }
755 }
756 }
758 if (notify_of_schedulable_batch) {
759 schedulable_batch_callback_();
760 }
762 return Status::OK();
763 }
765 template <typename TaskType>
NumEnqueuedTasks()766 size_t Queue<TaskType>::NumEnqueuedTasks() const {
767 mutex_lock l(mu_);
768 size_t num_enqueued_tasks = 0;
769 for (const auto& batch : batches_) {
770 num_enqueued_tasks += batch->num_tasks();
771 }
772 return num_enqueued_tasks;
773 }
775 template <typename TaskType>
SchedulingCapacity()776 size_t Queue<TaskType>::SchedulingCapacity() const {
777 mutex_lock l(mu_);
778 const int num_new_batches_schedulable =
779 options_.max_enqueued_batches - batches_.size();
780 const int open_batch_capacity =
781 max_execution_batch_size() - batches_.back()->size();
782 return (num_new_batches_schedulable * max_execution_batch_size()) +
783 open_batch_capacity;
784 }
786 template <typename TaskType>
ScheduleBatch()787 std::unique_ptr<Batch<TaskType>> Queue<TaskType>::ScheduleBatch() {
788 // The batch to schedule, which we may populate below. (If left as nullptr,
789 // that means we are electing not to schedule a batch at this time.)
790 std::unique_ptr<Batch<TaskType>> batch_to_schedule;
792 {
793 mutex_lock l(mu_);
795 // Consider closing the open batch at this time, to schedule it.
796 if (batches_.size() == 1 && IsOpenBatchSchedulable()) {
797 StartNewBatch();
798 }
800 if (batches_.size() >= 2) {
801 // There is at least one closed batch that is ready to be scheduled.
802 ++num_batches_being_processed_;
803 batch_to_schedule = std::move(batches_.front());
804 batches_.pop_front();
805 } else {
806 schedulable_batch_ = false;
807 }
808 }
810 return batch_to_schedule;
811 }
813 template <typename TaskType>
ProcessBatch(std::unique_ptr<Batch<TaskType>> batch)814 void Queue<TaskType>::ProcessBatch(std::unique_ptr<Batch<TaskType>> batch) {
815 profiler::TraceMeConsumer trace_me(
816 [&] {
817 return profiler::TraceMeEncode(
818 "ProcessBatch", {{"batch_size_before_padding", batch->size()}});
819 },
820 profiler::ContextType::kSharedBatchScheduler,
821 batch->traceme_context_id());
822 process_batch_callback_(std::move(batch));
824 {
825 mutex_lock l(mu_);
826 --num_batches_being_processed_;
827 if (empty_notification_ != nullptr && IsEmptyInternal()) {
828 empty_notification_->Notify();
829 }
830 }
831 }
833 template <typename TaskType>
IsEmpty()834 bool Queue<TaskType>::IsEmpty() const {
835 mutex_lock l(mu_);
836 return IsEmptyInternal();
837 }
839 template <typename TaskType>
CloseAndWaitUntilEmpty()840 void Queue<TaskType>::CloseAndWaitUntilEmpty() {
841 Notification empty;
842 {
843 mutex_lock l(mu_);
844 closed_ = true;
845 if (IsEmptyInternal()) {
846 empty.Notify();
847 } else {
848 // Arrange for ProcessBatch() to notify when the queue becomes empty.
849 empty_notification_ = ∅
850 }
851 }
852 empty.WaitForNotification();
853 }
855 template <typename TaskType>
IsEmptyInternal()856 bool Queue<TaskType>::IsEmptyInternal() const {
857 return num_batches_being_processed_ == 0 && batches_.size() == 1 &&
858 batches_.back()->empty();
859 }
861 template <typename TaskType>
StartNewBatch()862 void Queue<TaskType>::StartNewBatch() {
863 batches_.back()->Close();
864 batches_.emplace_back(new Batch<TaskType>(++traceme_context_id_counter_));
865 }
867 template <typename TaskType>
SplitInputBatchIntoSubtasks(std::unique_ptr<TaskType> * input_task,std::vector<std::unique_ptr<TaskType>> * output_tasks)868 Status Queue<TaskType>::SplitInputBatchIntoSubtasks(
869 std::unique_ptr<TaskType>* input_task,
870 std::vector<std::unique_ptr<TaskType>>* output_tasks) {
871 const int open_batch_remaining_slot =
872 max_execution_batch_size() - batches_.back()->size();
873 return options_.split_input_task_func(
874 std::move(input_task), open_batch_remaining_slot,
875 max_execution_batch_size(), std::move(output_tasks));
876 }
878 template <typename TaskType>
IsOpenBatchSchedulable()879 bool Queue<TaskType>::IsOpenBatchSchedulable() const {
880 Batch<TaskType>* open_batch = batches_.back().get();
881 if (open_batch->empty()) {
882 return false;
883 }
884 return closed_ || open_batch->size() >= max_execution_batch_size() ||
885 env_->NowMicros() >=
886 open_batch_start_time_micros_ + options_.batch_timeout_micros;
887 }
889 template <typename TaskType>
QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,Queue<TaskType> * queue)890 QueueHandle<TaskType>::QueueHandle(
891 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
892 Queue<TaskType>* queue)
893 : scheduler_(scheduler), queue_(queue) {}
895 template <typename TaskType>
~QueueHandle()896 QueueHandle<TaskType>::~QueueHandle() {
897 queue_->CloseAndWaitUntilEmpty();
898 }
900 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)901 Status QueueHandle<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
902 return queue_->Schedule(task);
903 }
905 template <typename TaskType>
NumEnqueuedTasks()906 size_t QueueHandle<TaskType>::NumEnqueuedTasks() const {
907 return queue_->NumEnqueuedTasks();
908 }
910 template <typename TaskType>
SchedulingCapacity()911 size_t QueueHandle<TaskType>::SchedulingCapacity() const {
912 return queue_->SchedulingCapacity();
913 }
915 } // namespace internal
917 } // namespace serving
918 } // namespace tensorflow