• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <memory>
22 #include <mutex>
23 #include <thread>
24 
25 #include "common/libs/concurrency/semaphore.h"
26 
27 namespace cuttlefish {
28 // move-based concurrent queue
29 template<typename T>
30 class ScreenConnectorQueue {
31 
32  public:
33   static_assert( is_movable<T>::value,
34                  "Items in ScreenConnectorQueue should be std::mov-able");
35 
36   ScreenConnectorQueue(const int q_max_size = 2)
q_mutex_(std::make_unique<std::mutex> ())37       : q_mutex_(std::make_unique<std::mutex>()), q_max_size_{q_max_size} {}
38   ScreenConnectorQueue(ScreenConnectorQueue&& cq) = delete;
39   ScreenConnectorQueue(const ScreenConnectorQueue& cq) = delete;
40   ScreenConnectorQueue& operator=(const ScreenConnectorQueue& cq) = delete;
41   ScreenConnectorQueue& operator=(ScreenConnectorQueue&& cq) = delete;
42 
IsEmpty()43   bool IsEmpty() const {
44     const std::lock_guard<std::mutex> lock(*q_mutex_);
45     return buffer_.empty();
46   }
47 
Size()48   auto Size() const {
49     const std::lock_guard<std::mutex> lock(*q_mutex_);
50     return buffer_.size();
51   }
52 
WaitEmpty()53   void WaitEmpty() {
54     auto is_empty = [this](void) { return buffer_.empty(); };
55     std::unique_lock<std::mutex> lock(*q_mutex_);
56     q_empty_.wait(lock, is_empty);
57   }
58 
59   /*
60    * Push( std::move(src) );
61    *
62    * Note: this queue is supposed to be used only by ScreenConnector-
63    * related components such as ScreenConnectorSource
64    *
65    * The traditional assumption was that when webRTC calls
66    * OnFrameAfter, the call should be block until it could return
67    * one frame.
68    *
69    * Thus, the producers of this queue must not produce frames
70    * much faster than the consumer, WebRTC consumes.
71    * Therefore, when the small buffer is full -- which means
72    * WebRTC would not call OnNextFrame --, the producer
73    * should stop adding itmes to the queue.
74    *
75    */
Push(T && item)76   void Push(T&& item) {
77     std::unique_lock<std::mutex> lock(*q_mutex_);
78     if (Full()) {
79       auto is_empty =
80           [this](void){ return buffer_.empty(); };
81       q_empty_.wait(lock, is_empty);
82     }
83     buffer_.push_back(std::move(item));
84   }
85   void Push(T& item) = delete;
86   void Push(const T& item) = delete;
87 
Pop()88   T Pop() {
89     const std::lock_guard<std::mutex> lock(*q_mutex_);
90     auto item = std::move(buffer_.front());
91     buffer_.pop_front();
92     if (buffer_.empty()) {
93       q_empty_.notify_all();
94     }
95     return item;
96   }
97 
98  private:
Full()99   bool Full() const {
100     // call this in a critical section
101     // after acquiring q_mutex_
102     return q_max_size_ == buffer_.size();
103   }
104   std::deque<T> buffer_;
105   std::unique_ptr<std::mutex> q_mutex_;
106   std::condition_variable q_empty_;
107   const int q_max_size_;
108 };
109 
110 } // namespace cuttlefish
111