1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #ifndef TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
17 #define TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
18
19 #include <stddef.h>
20 #include <deque>
21 #include <functional>
22 #include <list>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "tensorflow/core/kernels/batching_util/batch_scheduler.h"
29 #include "tensorflow/core/kernels/batching_util/periodic_function.h"
30 #include "tensorflow/core/lib/core/errors.h"
31 #include "tensorflow/core/lib/core/status.h"
32 #include "tensorflow/core/lib/strings/strcat.h"
33 #include "tensorflow/core/platform/byte_order.h"
34 #include "tensorflow/core/platform/cpu_info.h"
35 #include "tensorflow/core/platform/env.h"
36 #include "tensorflow/core/platform/thread_annotations.h"
37 #include "tensorflow/core/platform/types.h"
38
39 namespace tensorflow {
40 namespace serving {
41 namespace internal {
42 template <typename TaskType>
43 class Queue;
44 } // namespace internal
45 } // namespace serving
46 } // namespace tensorflow
47
48 namespace tensorflow {
49 namespace serving {
50
51 // A batch scheduler for server instances that service multiple request types
52 // (e.g. multiple machine-learned models, or multiple versions of a model served
53 // concurrently), or even multiple distinct tasks for a given request. The
54 // scheduler multiplexes batches of different kinds of tasks onto a fixed-size
55 // thread pool (each batch contains tasks of a single type), in a carefully
56 // controlled manner. A common configuration is to set the number of threads
57 // equal to the number of hardware accelerator units, in which case the
58 // scheduler takes care of multiplexing the task types onto the shared hardware,
59 // in a manner that is both fair and efficient.
60 //
61 // Semantically, SharedBatchScheduler behaves like having N instances of
62 // BasicBatchScheduler (see basic_batch_scheduler.h), one per task type. The
63 // difference is that under the covers there is a single shared thread pool,
64 // instead of N independent ones, with their sharing deliberately coordinated.
65 //
66 // SharedBatchScheduler does not implement the BatchScheduler API; rather, it
67 // presents an abstraction of "queues", where each queue corresponds to one type
68 // of task. Tasks submitted to a given queue are placed in their own batches,
69 // and cannot be mixed with other tasks. Queues can be added and deleted
70 // dynamically, to accommodate e.g. versions of a model being brought up and
71 // down over the lifetime of a server.
72 //
73 // The batch thread pool round-robins through the queues, running one batch
74 // from a queue and then moving to the next queue. Each queue behaves like a
75 // BasicBatchScheduler instance, in the sense that it has maximum batch size and
76 // timeout parameters, which govern when a batch is eligible to be processed.
77 //
78 // Each queue is independently configured with a maximum size (in terms of the
79 // maximum number of batches worth of enqueued tasks). For online serving, it is
80 // recommended that the queue sizes be configured such that the sum of the sizes
81 // of the active queues roughly equal the number of batch threads. (The idea is
82 // that if all threads become available at roughly the same time, there will be
83 // enough enqueued work for them to take on, but no more.)
84 //
85 // If queue sizes are configured in the manner suggested above, the maximum time
86 // a task can spend in a queue before being placed in a batch and assigned to a
87 // thread for processing, is the greater of:
88 // - the maximum time to process one batch of tasks from any active queue
89 // - the configured timeout parameter for the task's queue (which can be 0)
90 //
91 // For bulk processing jobs and throughput-oriented benchmarks, you may want to
92 // set the maximum queue size to a large value.
93 //
94 // TODO(b/26539183): Support queue servicing policies other than round-robin.
95 // E.g. let each queue specify a "share" (an int >= 1), so e.g. with queues A
96 // and B having shares 1 and 2 respectively, the servicing pattern is ABBABB...
97 //
98 //
99 // PERFORMANCE TUNING: See README.md.
100 //
101 template <typename TaskType>
102 class SharedBatchScheduler
103 : public std::enable_shared_from_this<SharedBatchScheduler<TaskType>> {
104 public:
105 // TODO(b/25089730): Tune defaults based on best practices as they develop.
106 struct Options {
107 // The name to use for the pool of batch threads.
108 string thread_pool_name = {"batch_threads"};
109
110 // The number of threads to use to process batches.
111 // Must be >= 1, and should be tuned carefully.
112 int num_batch_threads = port::NumSchedulableCPUs();
113
114 // The environment to use.
115 // (Typically only overridden by test code.)
116 Env* env = Env::Default();
117 };
118 // Ownership is shared between the caller of Create() and any queues created
119 // via AddQueue().
120 static Status Create(
121 const Options& options,
122 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler);
123
124 ~SharedBatchScheduler();
125
126 // Adds a queue to which tasks may be submitted. The returned queue implements
127 // the BatchScheduler API. Each queue has its own set of scheduling options,
128 // and its own callback to process batches of tasks submitted to the queue.
129 //
130 // The returned queue's destructor blocks until all tasks submitted to it have
131 // been processed.
132 struct QueueOptions {
133 // The maximum size of each batch.
134 //
135 // The scheduler may form batches of any size between 1 and this number
136 // (inclusive). If there is a need to quantize the batch sizes, i.e. only
137 // submit batches whose size is in a small set of allowed sizes, that can be
138 // done by adding padding in the process-batch callback.
139 size_t max_batch_size = 1000;
140
141 // If a task has been enqueued for this amount of time (in microseconds),
142 // and a thread is available, the scheduler will immediately form a batch
143 // from enqueued tasks and assign the batch to the thread for processing,
144 // even if the batch's size is below 'max_batch_size'.
145 //
146 // This parameter offers a way to bound queue latency, so that a task isn't
147 // stuck in the queue indefinitely waiting for enough tasks to arrive to
148 // make a full batch. (The latency bound is given in the class documentation
149 // above.)
150 //
151 // The goal is to smooth out batch sizes under low request rates, and thus
152 // avoid latency spikes.
153 int64 batch_timeout_micros = 0;
154
155 // The maximum allowable number of enqueued (accepted by Schedule() but
156 // not yet being processed on a batch thread) tasks in terms of batches.
157 // If this limit is reached, Schedule() will return an UNAVAILABLE error.
158 // See the class documentation above for guidelines on how to tune this
159 // parameter.
160 size_t max_enqueued_batches = 10;
161 };
162 Status AddQueue(const QueueOptions& options,
163 std::function<void(std::unique_ptr<Batch<TaskType>>)>
164 process_batch_callback,
165 std::unique_ptr<BatchScheduler<TaskType>>* queue);
166
167 private:
168 explicit SharedBatchScheduler(const Options& options);
169
170 // The code executed in 'batch_threads_'. Obtains a batch to process from the
171 // queue pointed to by 'next_queue_to_schedule_', and processes it. If that
172 // queue declines to provide a batch to process, moves onto the next queue. If
173 // no queues provide a batch to process, just sleeps briefly and exits.
174 void ThreadLogic();
175
176 const Options options_;
177
178 mutex mu_;
179
180 // A list of queues. (We use std::list instead of std::vector to ensure that
181 // iterators are not invalidated by adding/removing elements. It also offers
182 // efficient removal of elements from the middle.)
183 using QueueList = std::list<std::unique_ptr<internal::Queue<TaskType>>>;
184
185 // All "active" queues, i.e. ones that either:
186 // - have not been removed, or
187 // - have been removed but are not yet empty.
188 QueueList queues_ GUARDED_BY(mu_);
189
190 // An iterator over 'queues_', pointing to the queue from which the next
191 // available batch thread should grab work.
192 typename QueueList::iterator next_queue_to_schedule_ GUARDED_BY(mu_);
193
194 // Used by idle batch threads to wait for work to enter the system. Notified
195 // whenever a batch becomes schedulable.
196 condition_variable schedulable_batch_cv_;
197
198 // Threads that process batches obtained from the queues.
199 std::vector<std::unique_ptr<PeriodicFunction>> batch_threads_;
200
201 TF_DISALLOW_COPY_AND_ASSIGN(SharedBatchScheduler);
202 };
203
204 //////////
205 // Implementation details follow. API users need not read.
206
207 namespace internal {
208
209 // A task queue for SharedBatchScheduler. Accepts tasks and accumulates them
210 // into batches, and dispenses those batches to be processed via a "pull"
211 // interface. The queue's behavior is governed by maximum batch size, timeout
212 // and maximum queue length parameters; see their documentation in
213 // SharedBatchScheduler.
214 //
215 // The queue is implemented as a deque of batches, with these invariants:
216 // - The number of batches is between 1 and 'options_.max_enqueued_batches'.
217 // - The back-most batch is open; the rest are closed.
218 //
219 // Submitted tasks are added to the open batch. If that batch doesn't have room
220 // but the queue isn't full, then that batch is closed and a new open batch is
221 // started.
222 //
223 // Batch pull requests are handled by dequeuing the front-most batch if it is
224 // closed. If the front-most batch is open (i.e. the queue contains only one
225 // batch) and has reached the timeout, it is immediately closed and returned;
226 // otherwise no batch is returned for the request.
227 template <typename TaskType>
228 class Queue {
229 public:
230 using ProcessBatchCallback =
231 std::function<void(std::unique_ptr<Batch<TaskType>>)>;
232 using SchedulableBatchCallback = std::function<void()>;
233 Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
234 Env* env, ProcessBatchCallback process_batch_callback,
235 SchedulableBatchCallback schdulable_batch_callback);
236
237 // Illegal to destruct unless the queue is empty.
238 ~Queue();
239
240 // Submits a task to the queue, with the same semantics as
241 // BatchScheduler::Schedule().
242 Status Schedule(std::unique_ptr<TaskType>* task);
243
244 // Returns the number of enqueued tasks, with the same semantics as
245 // BatchScheduler::NumEnqueuedTasks().
246 size_t NumEnqueuedTasks() const;
247
248 // Returns the queue capacity, with the same semantics as
249 // BatchScheduler::SchedulingCapacity().
250 size_t SchedulingCapacity() const;
251
252 // Returns the maximum allowed size of tasks submitted to the queue.
max_task_size()253 size_t max_task_size() const { return options_.max_batch_size; }
254
255 // Called by a thread that is ready to process a batch, to request one from
256 // this queue. Either returns a batch that is ready to be processed, or
257 // nullptr if the queue declines to schedule a batch at this time. If it
258 // returns a batch, the batch is guaranteed to be closed.
259 std::unique_ptr<Batch<TaskType>> ScheduleBatch();
260
261 // Processes a batch that has been returned earlier by ScheduleBatch().
262 void ProcessBatch(std::unique_ptr<Batch<TaskType>> batch);
263
264 // Determines whether the queue is empty, i.e. has no tasks waiting or being
265 // processed.
266 bool IsEmpty() const;
267
268 // Marks the queue closed, and waits until it is empty.
269 void CloseAndWaitUntilEmpty();
270
closed()271 bool closed() const {
272 mutex_lock l(mu_);
273 return closed_;
274 }
275
276 private:
277 // Same as IsEmpty(), but assumes the caller already holds a lock on 'mu_'.
278 bool IsEmptyInternal() const EXCLUSIVE_LOCKS_REQUIRED(mu_);
279
280 // Closes the open batch residing at the back of 'batches_', and inserts a
281 // fresh open batch behind it.
282 void StartNewBatch() EXCLUSIVE_LOCKS_REQUIRED(mu_);
283
284 // Determines whether the open batch residing at the back of 'batches_' is
285 // currently schedulable.
286 bool IsOpenBatchSchedulable() const EXCLUSIVE_LOCKS_REQUIRED(mu_);
287
288 const typename SharedBatchScheduler<TaskType>::QueueOptions options_;
289
290 // The environment to use.
291 Env* env_;
292
293 // A callback invoked to processes a batch of work units. Always invoked from
294 // a batch thread.
295 ProcessBatchCallback process_batch_callback_;
296
297 // A callback invoked to notify the scheduler that a new batch has become
298 // schedulable.
299 SchedulableBatchCallback schedulable_batch_callback_;
300
301 mutable mutex mu_;
302
303 // Whether this queue can accept new tasks. This variable is monotonic: it
304 // starts as false, and then at some point gets set to true and remains true
305 // for the duration of this object's life.
306 bool closed_ GUARDED_BY(mu_) = false;
307
308 // The enqueued batches. See the invariants in the class comments above.
309 std::deque<std::unique_ptr<Batch<TaskType>>> batches_ GUARDED_BY(mu_);
310
311 // The time at which the first task was added to the open (back-most) batch
312 // in 'batches_'. Valid iff that batch contains at least one task.
313 uint64 open_batch_start_time_micros_ GUARDED_BY(mu_);
314
315 // Whether this queue contains a batch that is eligible to be scheduled. Used
316 // to keep track of when to call 'schedulable_batch_callback_'.
317 bool schedulable_batch_ GUARDED_BY(mu_) = false;
318
319 // The number of batches currently being processed by batch threads.
320 // Incremented in ScheduleBatch() and decremented in ProcessBatch().
321 int num_batches_being_processed_ GUARDED_BY(mu_) = 0;
322
323 // Used by CloseAndWaitUntilEmpty() to wait until the queue is empty, for the
324 // case in which the queue is not empty when CloseAndWaitUntilEmpty() starts.
325 // When ProcessBatch() dequeues the last batch and makes the queue empty, if
326 // 'empty_notification_' is non-null it calls 'empty_notification_->Notify()'.
327 Notification* empty_notification_ GUARDED_BY(mu_) = nullptr;
328
329 TF_DISALLOW_COPY_AND_ASSIGN(Queue);
330 };
331
332 // A RAII-style object that points to a Queue and implements
333 // the BatchScheduler API. To be handed out to clients who call AddQueue().
334 template <typename TaskType>
335 class QueueHandle : public BatchScheduler<TaskType> {
336 public:
337 QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
338 Queue<TaskType>* queue);
339 ~QueueHandle() override;
340
341 Status Schedule(std::unique_ptr<TaskType>* task) override;
342 size_t NumEnqueuedTasks() const override;
343 size_t SchedulingCapacity() const override;
344
max_task_size()345 size_t max_task_size() const override { return queue_->max_task_size(); }
346
347 private:
348 // The scheduler that owns 'queue_'.
349 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler_;
350
351 // The queue this handle wraps. Owned by 'scheduler_', which keeps it alive at
352 // least until this class's destructor closes it.
353 Queue<TaskType>* queue_;
354
355 TF_DISALLOW_COPY_AND_ASSIGN(QueueHandle);
356 };
357
358 } // namespace internal
359
360 template <typename TaskType>
Create(const Options & options,std::shared_ptr<SharedBatchScheduler<TaskType>> * scheduler)361 Status SharedBatchScheduler<TaskType>::Create(
362 const Options& options,
363 std::shared_ptr<SharedBatchScheduler<TaskType>>* scheduler) {
364 if (options.num_batch_threads < 1) {
365 return errors::InvalidArgument("num_batch_threads must be positive; was ",
366 options.num_batch_threads);
367 }
368 scheduler->reset(new SharedBatchScheduler<TaskType>(options));
369 return Status::OK();
370 }
371
372 template <typename TaskType>
~SharedBatchScheduler()373 SharedBatchScheduler<TaskType>::~SharedBatchScheduler() {
374 // Wait until the batch threads finish clearing out and deleting the closed
375 // queues.
376 for (;;) {
377 {
378 mutex_lock l(mu_);
379 if (queues_.empty()) {
380 break;
381 }
382 }
383 const int64 kSleepTimeMicros = 100;
384 options_.env->SleepForMicroseconds(kSleepTimeMicros);
385 }
386 // Delete the batch threads before allowing state the threads may access (e.g.
387 // 'mu_') to be deleted.
388 batch_threads_.clear();
389 }
390
391 template <typename TaskType>
AddQueue(const QueueOptions & options,std::function<void (std::unique_ptr<Batch<TaskType>>)> process_batch_callback,std::unique_ptr<BatchScheduler<TaskType>> * queue)392 Status SharedBatchScheduler<TaskType>::AddQueue(
393 const QueueOptions& options,
394 std::function<void(std::unique_ptr<Batch<TaskType>>)>
395 process_batch_callback,
396 std::unique_ptr<BatchScheduler<TaskType>>* queue) {
397 if (options.max_batch_size == 0) {
398 return errors::InvalidArgument("max_batch_size must be positive; was ",
399 options.max_batch_size);
400 }
401 if (options.batch_timeout_micros < 0) {
402 return errors::InvalidArgument(
403 "batch_timeout_micros must be non-negative; was ",
404 options.batch_timeout_micros);
405 }
406 if (options.max_enqueued_batches < 0) {
407 return errors::InvalidArgument(
408 "max_enqueued_batches must be non-negative; was ",
409 options.max_enqueued_batches);
410 }
411
412 auto schedulable_batch_callback = [this] {
413 mutex_lock l(mu_);
414 schedulable_batch_cv_.notify_one();
415 };
416 auto internal_queue =
417 std::unique_ptr<internal::Queue<TaskType>>(new internal::Queue<TaskType>(
418 options, options_.env, process_batch_callback,
419 schedulable_batch_callback));
420 auto handle = std::unique_ptr<BatchScheduler<TaskType>>(
421 new internal::QueueHandle<TaskType>(this->shared_from_this(),
422 internal_queue.get()));
423 {
424 mutex_lock l(mu_);
425 queues_.push_back(std::move(internal_queue));
426 if (next_queue_to_schedule_ == queues_.end()) {
427 next_queue_to_schedule_ = queues_.begin();
428 }
429 }
430 *queue = std::move(handle);
431 return Status::OK();
432 }
433
434 template <typename TaskType>
SharedBatchScheduler(const Options & options)435 SharedBatchScheduler<TaskType>::SharedBatchScheduler(const Options& options)
436 : options_(options), next_queue_to_schedule_(queues_.end()) {
437 // Kick off the batch threads.
438 PeriodicFunction::Options periodic_fn_options;
439 periodic_fn_options.thread_name_prefix =
440 strings::StrCat(options.thread_pool_name, "_");
441 for (int i = 0; i < options.num_batch_threads; ++i) {
442 std::unique_ptr<PeriodicFunction> thread(new PeriodicFunction(
443 [this] { this->ThreadLogic(); },
444 0 /* function invocation interval time */, periodic_fn_options));
445 batch_threads_.push_back(std::move(thread));
446 }
447 }
448
449 template <typename TaskType>
ThreadLogic()450 void SharedBatchScheduler<TaskType>::ThreadLogic() {
451 // A batch to process next (or nullptr if no work to do).
452 std::unique_ptr<Batch<TaskType>> batch_to_process;
453 // The queue with which 'batch_to_process' is associated.
454 internal::Queue<TaskType>* queue_for_batch = nullptr;
455 {
456 mutex_lock l(mu_);
457
458 const int num_queues = queues_.size();
459 for (int num_queues_tried = 0;
460 batch_to_process == nullptr && num_queues_tried < num_queues;
461 ++num_queues_tried) {
462 DCHECK(next_queue_to_schedule_ != queues_.end());
463
464 // If a closed queue responds to ScheduleBatch() with nullptr, the queue
465 // will never yield any further batches so we can drop it. To avoid a
466 // race, we take a snapshot of the queue's closedness state *before*
467 // calling ScheduleBatch().
468 const bool queue_closed = (*next_queue_to_schedule_)->closed();
469
470 // Ask '*next_queue_to_schedule_' if it wants us to process a batch.
471 batch_to_process = (*next_queue_to_schedule_)->ScheduleBatch();
472 if (batch_to_process != nullptr) {
473 queue_for_batch = next_queue_to_schedule_->get();
474 }
475
476 // Advance 'next_queue_to_schedule_'.
477 if (queue_closed && (*next_queue_to_schedule_)->IsEmpty() &&
478 batch_to_process == nullptr) {
479 // We've encountered a closed queue with no work to do. Drop it.
480 DCHECK_NE(queue_for_batch, next_queue_to_schedule_->get());
481 next_queue_to_schedule_ = queues_.erase(next_queue_to_schedule_);
482 } else {
483 ++next_queue_to_schedule_;
484 }
485 if (next_queue_to_schedule_ == queues_.end() && !queues_.empty()) {
486 // We've hit the end. Wrap to the first queue.
487 next_queue_to_schedule_ = queues_.begin();
488 }
489 }
490
491 if (batch_to_process == nullptr) {
492 // We couldn't find any work to do. Wait until a new batch becomes
493 // schedulable, or some time has elapsed, before checking again.
494 const int64 kTimeoutMillis = 1; // The smallest accepted granule of time.
495 WaitForMilliseconds(&l, &schedulable_batch_cv_, kTimeoutMillis);
496 return;
497 }
498 }
499
500 queue_for_batch->ProcessBatch(std::move(batch_to_process));
501 }
502
503 namespace internal {
504
505 template <typename TaskType>
Queue(const typename SharedBatchScheduler<TaskType>::QueueOptions & options,Env * env,ProcessBatchCallback process_batch_callback,SchedulableBatchCallback schedulable_batch_callback)506 Queue<TaskType>::Queue(
507 const typename SharedBatchScheduler<TaskType>::QueueOptions& options,
508 Env* env, ProcessBatchCallback process_batch_callback,
509 SchedulableBatchCallback schedulable_batch_callback)
510 : options_(options),
511 env_(env),
512 process_batch_callback_(process_batch_callback),
513 schedulable_batch_callback_(schedulable_batch_callback) {
514 // Create an initial, open batch.
515 batches_.emplace_back(new Batch<TaskType>);
516 }
517
518 template <typename TaskType>
~Queue()519 Queue<TaskType>::~Queue() {
520 mutex_lock l(mu_);
521 DCHECK(IsEmptyInternal());
522
523 // Close the (empty) open batch, so its destructor doesn't block.
524 batches_.back()->Close();
525 }
526
527 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)528 Status Queue<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
529 if ((*task)->size() > options_.max_batch_size) {
530 return errors::InvalidArgument("Task size ", (*task)->size(),
531 " is larger than maximum batch size ",
532 options_.max_batch_size);
533 }
534
535 bool notify_of_schedulable_batch = false;
536 {
537 mutex_lock l(mu_);
538
539 DCHECK(!closed_);
540
541 if (batches_.back()->size() + (*task)->size() > options_.max_batch_size) {
542 if (batches_.size() >= options_.max_enqueued_batches) {
543 return errors::Unavailable(
544 "The batch scheduling queue to which this task was submitted is "
545 "full");
546 }
547 StartNewBatch();
548 }
549 if (batches_.back()->empty()) {
550 open_batch_start_time_micros_ = env_->NowMicros();
551 }
552 batches_.back()->AddTask(std::move(*task));
553
554 if (!schedulable_batch_) {
555 if (batches_.size() > 1 || IsOpenBatchSchedulable()) {
556 schedulable_batch_ = true;
557 notify_of_schedulable_batch = true;
558 }
559 }
560 }
561
562 if (notify_of_schedulable_batch) {
563 schedulable_batch_callback_();
564 }
565
566 return Status::OK();
567 }
568
569 template <typename TaskType>
NumEnqueuedTasks()570 size_t Queue<TaskType>::NumEnqueuedTasks() const {
571 mutex_lock l(mu_);
572 size_t num_enqueued_tasks = 0;
573 for (const auto& batch : batches_) {
574 num_enqueued_tasks += batch->num_tasks();
575 }
576 return num_enqueued_tasks;
577 }
578
579 template <typename TaskType>
SchedulingCapacity()580 size_t Queue<TaskType>::SchedulingCapacity() const {
581 mutex_lock l(mu_);
582 const int num_new_batches_schedulable =
583 options_.max_enqueued_batches - batches_.size();
584 const int open_batch_capacity =
585 options_.max_batch_size - batches_.back()->size();
586 return (num_new_batches_schedulable * options_.max_batch_size) +
587 open_batch_capacity;
588 }
589
590 template <typename TaskType>
ScheduleBatch()591 std::unique_ptr<Batch<TaskType>> Queue<TaskType>::ScheduleBatch() {
592 // The batch to schedule, which we may populate below. (If left as nullptr,
593 // that means we are electing not to schedule a batch at this time.)
594 std::unique_ptr<Batch<TaskType>> batch_to_schedule;
595
596 {
597 mutex_lock l(mu_);
598
599 // Consider closing the open batch at this time, to schedule it.
600 if (batches_.size() == 1 && IsOpenBatchSchedulable()) {
601 StartNewBatch();
602 }
603
604 if (batches_.size() >= 2) {
605 // There is at least one closed batch that is ready to be scheduled.
606 ++num_batches_being_processed_;
607 batch_to_schedule = std::move(batches_.front());
608 batches_.pop_front();
609 } else {
610 schedulable_batch_ = false;
611 }
612 }
613
614 return batch_to_schedule;
615 }
616
617 template <typename TaskType>
ProcessBatch(std::unique_ptr<Batch<TaskType>> batch)618 void Queue<TaskType>::ProcessBatch(std::unique_ptr<Batch<TaskType>> batch) {
619 process_batch_callback_(std::move(batch));
620
621 {
622 mutex_lock l(mu_);
623 --num_batches_being_processed_;
624 if (empty_notification_ != nullptr && IsEmptyInternal()) {
625 empty_notification_->Notify();
626 }
627 }
628 }
629
630 template <typename TaskType>
IsEmpty()631 bool Queue<TaskType>::IsEmpty() const {
632 mutex_lock l(mu_);
633 return IsEmptyInternal();
634 }
635
636 template <typename TaskType>
CloseAndWaitUntilEmpty()637 void Queue<TaskType>::CloseAndWaitUntilEmpty() {
638 Notification empty;
639 {
640 mutex_lock l(mu_);
641 closed_ = true;
642 if (IsEmptyInternal()) {
643 empty.Notify();
644 } else {
645 // Arrange for ProcessBatch() to notify when the queue becomes empty.
646 empty_notification_ = ∅
647 }
648 }
649 empty.WaitForNotification();
650 }
651
652 template <typename TaskType>
IsEmptyInternal()653 bool Queue<TaskType>::IsEmptyInternal() const {
654 return num_batches_being_processed_ == 0 && batches_.size() == 1 &&
655 batches_.back()->empty();
656 }
657
658 template <typename TaskType>
StartNewBatch()659 void Queue<TaskType>::StartNewBatch() {
660 batches_.back()->Close();
661 batches_.emplace_back(new Batch<TaskType>);
662 }
663
664 template <typename TaskType>
IsOpenBatchSchedulable()665 bool Queue<TaskType>::IsOpenBatchSchedulable() const {
666 Batch<TaskType>* open_batch = batches_.back().get();
667 if (open_batch->empty()) {
668 return false;
669 }
670 return closed_ || open_batch->size() >= options_.max_batch_size ||
671 env_->NowMicros() >=
672 open_batch_start_time_micros_ + options_.batch_timeout_micros;
673 }
674
675 template <typename TaskType>
QueueHandle(std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,Queue<TaskType> * queue)676 QueueHandle<TaskType>::QueueHandle(
677 std::shared_ptr<SharedBatchScheduler<TaskType>> scheduler,
678 Queue<TaskType>* queue)
679 : scheduler_(scheduler), queue_(queue) {}
680
681 template <typename TaskType>
~QueueHandle()682 QueueHandle<TaskType>::~QueueHandle() {
683 queue_->CloseAndWaitUntilEmpty();
684 }
685
686 template <typename TaskType>
Schedule(std::unique_ptr<TaskType> * task)687 Status QueueHandle<TaskType>::Schedule(std::unique_ptr<TaskType>* task) {
688 return queue_->Schedule(task);
689 }
690
691 template <typename TaskType>
NumEnqueuedTasks()692 size_t QueueHandle<TaskType>::NumEnqueuedTasks() const {
693 return queue_->NumEnqueuedTasks();
694 }
695
696 template <typename TaskType>
SchedulingCapacity()697 size_t QueueHandle<TaskType>::SchedulingCapacity() const {
698 return queue_->SchedulingCapacity();
699 }
700
701 } // namespace internal
702
703 } // namespace serving
704 } // namespace tensorflow
705
706 #endif // TENSORFLOW_CORE_KERNELS_BATCHING_UTIL_SHARED_BATCH_SCHEDULER_H_
707