1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <condition_variable> 20 #include <memory> 21 #include <vector> 22 23 #include "common/libs/concurrency/semaphore.h" 24 #include "common/libs/concurrency/thread_safe_queue.h" 25 26 namespace cuttlefish { 27 namespace confui { 28 template <typename T> 29 class Multiplexer { 30 public: Multiplexer(int n_qs,int max_elements)31 Multiplexer(int n_qs, int max_elements) : sem_items_{0}, next_{0} { 32 auto drop_new = [](typename ThreadSafeQueue<T>::QueueImpl* internal_q) { 33 internal_q->pop_front(); 34 }; 35 for (int i = 0; i < n_qs; i++) { 36 auto queue = std::make_unique<ThreadSafeQueue<T>>(max_elements, drop_new); 37 queues_.push_back(std::move(queue)); 38 } 39 } 40 GetNewQueueId()41 int GetNewQueueId() { 42 CHECK(next_ < queues_.size()) 43 << "can't get more queues than " << queues_.size(); 44 return next_++; 45 } 46 Push(const int idx,T && t)47 void Push(const int idx, T&& t) { 48 CheckIdx(idx); 49 queues_[idx]->Push(t); 50 sem_items_.SemPost(); 51 } 52 Pop()53 T Pop() { 54 // the idx must have an item! 55 // no waiting in fn()! 56 sem_items_.SemWait(); 57 for (auto& q : queues_) { 58 if (q->IsEmpty()) { 59 continue; 60 } 61 return q->Pop(); 62 } 63 CHECK(false) << "Multiplexer.Pop() should be able to return an item"; 64 // must not reach here 65 return T{}; 66 } 67 68 private: CheckIdx(const int idx)69 void CheckIdx(const int idx) { 70 CHECK(idx >= 0 && idx < queues_.size()) << "queues_ array out of bound"; 71 } 72 // total items across the queues 73 Semaphore sem_items_; 74 std::vector<std::unique_ptr<ThreadSafeQueue<T>>> queues_; 75 int next_; 76 }; 77 } // end of namespace confui 78 } // end of namespace cuttlefish 79