• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17template <typename T>
18Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
19
20template <typename T>
21Queue<T>::~Queue() {
22  ASSERT(enqueue_.handler_ == nullptr);
23  ASSERT(dequeue_.handler_ == nullptr);
24};
25
26template <typename T>
27void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
28  std::lock_guard<std::mutex> lock(mutex_);
29  ASSERT(enqueue_.handler_ == nullptr);
30  ASSERT(enqueue_.reactable_ == nullptr);
31  enqueue_.handler_ = handler;
32  enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
33      enqueue_.reactive_semaphore_.GetFd(),
34      base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)),
35      base::Closure());
36}
37
38template <typename T>
39void Queue<T>::UnregisterEnqueue() {
40  std::lock_guard<std::mutex> lock(mutex_);
41  ASSERT(enqueue_.reactable_ != nullptr);
42  enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_);
43  enqueue_.reactable_ = nullptr;
44  enqueue_.handler_ = nullptr;
45}
46
47template <typename T>
48void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
49  std::lock_guard<std::mutex> lock(mutex_);
50  ASSERT(dequeue_.handler_ == nullptr);
51  ASSERT(dequeue_.reactable_ == nullptr);
52  dequeue_.handler_ = handler;
53  dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(),
54                                                                           callback, base::Closure());
55}
56
57template <typename T>
58void Queue<T>::UnregisterDequeue() {
59  std::lock_guard<std::mutex> lock(mutex_);
60  ASSERT(dequeue_.reactable_ != nullptr);
61  dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_);
62  dequeue_.reactable_ = nullptr;
63  dequeue_.handler_ = nullptr;
64}
65
66template <typename T>
67std::unique_ptr<T> Queue<T>::TryDequeue() {
68  std::lock_guard<std::mutex> lock(mutex_);
69
70  if (queue_.empty()) {
71    return nullptr;
72  }
73
74  dequeue_.reactive_semaphore_.Decrease();
75
76  std::unique_ptr<T> data = std::move(queue_.front());
77  queue_.pop();
78
79  enqueue_.reactive_semaphore_.Increase();
80
81  return data;
82}
83
84template <typename T>
85void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
86  std::unique_ptr<T> data = callback.Run();
87  ASSERT(data != nullptr);
88  std::lock_guard<std::mutex> lock(mutex_);
89  enqueue_.reactive_semaphore_.Decrease();
90  queue_.push(std::move(data));
91  dequeue_.reactive_semaphore_.Increase();
92}
93