1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <deque> 20 #include <memory> 21 #include <thread> 22 #include <mutex> 23 #include <condition_variable> 24 #include <chrono> 25 26 #include "common/libs/concurrency/semaphore.h" 27 28 namespace cuttlefish { 29 // move-based concurrent queue 30 template<typename T> 31 class ScreenConnectorQueue { 32 33 public: 34 static const int kQSize = 2; 35 36 static_assert( is_movable<T>::value, 37 "Items in ScreenConnectorQueue should be std::mov-able"); 38 ScreenConnectorQueue(Semaphore & sc_sem)39 ScreenConnectorQueue(Semaphore& sc_sem) 40 : q_mutex_(std::make_unique<std::mutex>()), sc_semaphore_(sc_sem) {} 41 ScreenConnectorQueue(ScreenConnectorQueue&& cq) = delete; 42 ScreenConnectorQueue(const ScreenConnectorQueue& cq) = delete; 43 ScreenConnectorQueue& operator=(const ScreenConnectorQueue& cq) = delete; 44 ScreenConnectorQueue& operator=(ScreenConnectorQueue&& cq) = delete; 45 Empty()46 bool Empty() const { 47 const std::lock_guard<std::mutex> lock(*q_mutex_); 48 return buffer_.empty(); 49 } 50 Size()51 auto Size() const { 52 const std::lock_guard<std::mutex> lock(*q_mutex_); 53 return buffer_.size(); 54 } 55 WaitEmpty()56 void WaitEmpty() { 57 auto is_empty = [this](void) { return buffer_.empty(); }; 58 std::unique_lock<std::mutex> lock(*q_mutex_); 59 q_empty_.wait(lock, is_empty); 60 } 61 62 /* 63 * PushBack( std::move(src) ); 64 * 65 * Note: this queue is suppoed to be used only by ScreenConnector- 66 * related components such as ScreenConnectorSource 67 * 68 * The traditional assumption was that when webRTC or VNC calls 69 * OnFrameAfter, the call should be block until it could return 70 * one frame. 71 * 72 * Thus, the producers of this queue must not produce frames 73 * much faster than the consumer, VNC or WebRTC consumes. 74 * Therefore, when the small buffer is full -- which means 75 * VNC or WebRTC would not call OnFrameAfter --, the producer 76 * should stop adding itmes to the queue. 77 * 78 */ PushBack(T && item)79 void PushBack(T&& item) { 80 std::unique_lock<std::mutex> lock(*q_mutex_); 81 if (Full()) { 82 auto is_empty = 83 [this](void){ return buffer_.empty(); }; 84 q_empty_.wait(lock, is_empty); 85 } 86 buffer_.push_back(std::move(item)); 87 /* Whether the total number of items in ALL queus is 0 or not 88 * is tracked via a semaphore shared by all queues 89 * 90 * This is NOT intended to block queue from pushing an item 91 * This IS intended to awake the screen_connector consumer thread 92 * when one or more items are available at least in one queue 93 */ 94 sc_semaphore_.SemPost(); 95 } 96 void PushBack(T& item) = delete; 97 void PushBack(const T& item) = delete; 98 99 /* 100 * PopFront must be preceded by sc_semaphore_.SemWaitItem() 101 * 102 */ PopFront()103 T PopFront() { 104 const std::lock_guard<std::mutex> lock(*q_mutex_); 105 auto item = std::move(buffer_.front()); 106 buffer_.pop_front(); 107 if (buffer_.empty()) { 108 q_empty_.notify_all(); 109 } 110 return item; 111 } 112 113 private: Full()114 bool Full() const { 115 // call this in a critical section 116 // after acquiring q_mutex_ 117 return kQSize == buffer_.size(); 118 } 119 std::deque<T> buffer_; 120 std::unique_ptr<std::mutex> q_mutex_; 121 std::condition_variable q_empty_; 122 Semaphore& sc_semaphore_; 123 }; 124 125 } // namespace cuttlefish 126