• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <deque>
20 #include <memory>
21 #include <thread>
22 #include <mutex>
23 #include <condition_variable>
24 #include <chrono>
25 
26 #include "common/libs/concurrency/semaphore.h"
27 
28 namespace cuttlefish {
29 // move-based concurrent queue
30 template<typename T>
31 class ScreenConnectorQueue {
32 
33  public:
34   static const int kQSize = 2;
35 
36   static_assert( is_movable<T>::value,
37                  "Items in ScreenConnectorQueue should be std::mov-able");
38 
ScreenConnectorQueue(Semaphore & sc_sem)39   ScreenConnectorQueue(Semaphore& sc_sem)
40       : q_mutex_(std::make_unique<std::mutex>()), sc_semaphore_(sc_sem) {}
41   ScreenConnectorQueue(ScreenConnectorQueue&& cq) = delete;
42   ScreenConnectorQueue(const ScreenConnectorQueue& cq) = delete;
43   ScreenConnectorQueue& operator=(const ScreenConnectorQueue& cq) = delete;
44   ScreenConnectorQueue& operator=(ScreenConnectorQueue&& cq) = delete;
45 
Empty()46   bool Empty() const {
47     const std::lock_guard<std::mutex> lock(*q_mutex_);
48     return buffer_.empty();
49   }
50 
Size()51   auto Size() const {
52     const std::lock_guard<std::mutex> lock(*q_mutex_);
53     return buffer_.size();
54   }
55 
WaitEmpty()56   void WaitEmpty() {
57     auto is_empty = [this](void) { return buffer_.empty(); };
58     std::unique_lock<std::mutex> lock(*q_mutex_);
59     q_empty_.wait(lock, is_empty);
60   }
61 
62   /*
63    * PushBack( std::move(src) );
64    *
65    * Note: this queue is suppoed to be used only by ScreenConnector-
66    * related components such as ScreenConnectorSource
67    *
68    * The traditional assumption was that when webRTC or VNC calls
69    * OnFrameAfter, the call should be block until it could return
70    * one frame.
71    *
72    * Thus, the producers of this queue must not produce frames
73    * much faster than the consumer, VNC or WebRTC consumes.
74    * Therefore, when the small buffer is full -- which means
75    * VNC or WebRTC would not call OnFrameAfter --, the producer
76    * should stop adding itmes to the queue.
77    *
78    */
PushBack(T && item)79   void PushBack(T&& item) {
80     std::unique_lock<std::mutex> lock(*q_mutex_);
81     if (Full()) {
82       auto is_empty =
83           [this](void){ return buffer_.empty(); };
84       q_empty_.wait(lock, is_empty);
85     }
86     buffer_.push_back(std::move(item));
87     /* Whether the total number of items in ALL queus is 0 or not
88      * is tracked via a semaphore shared by all queues
89      *
90      * This is NOT intended to block queue from pushing an item
91      * This IS intended to awake the screen_connector consumer thread
92      * when one or more items are available at least in one queue
93      */
94     sc_semaphore_.SemPost();
95   }
96   void PushBack(T& item) = delete;
97   void PushBack(const T& item) = delete;
98 
99   /*
100    * PopFront must be preceded by sc_semaphore_.SemWaitItem()
101    *
102    */
PopFront()103   T PopFront() {
104     const std::lock_guard<std::mutex> lock(*q_mutex_);
105     auto item = std::move(buffer_.front());
106     buffer_.pop_front();
107     if (buffer_.empty()) {
108       q_empty_.notify_all();
109     }
110     return item;
111   }
112 
113  private:
Full()114   bool Full() const {
115     // call this in a critical section
116     // after acquiring q_mutex_
117     return kQSize == buffer_.size();
118   }
119   std::deque<T> buffer_;
120   std::unique_ptr<std::mutex> q_mutex_;
121   std::condition_variable q_empty_;
122   Semaphore& sc_semaphore_;
123 };
124 
125 } // namespace cuttlefish
126