1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <grpc/support/port_platform.h> 20 21 #include "src/core/lib/gprpp/mpscq.h" 22 23 namespace grpc_core { 24 25 // 26 // MultiProducerSingleConsumerQueue 27 // 28 Push(Node * node)29bool MultiProducerSingleConsumerQueue::Push(Node* node) { 30 node->next.Store(nullptr, MemoryOrder::RELAXED); 31 Node* prev = head_.Exchange(node, MemoryOrder::ACQ_REL); 32 prev->next.Store(node, MemoryOrder::RELEASE); 33 return prev == &stub_; 34 } 35 36 MultiProducerSingleConsumerQueue::Node* Pop()37MultiProducerSingleConsumerQueue::Pop() { 38 bool empty; 39 return PopAndCheckEnd(&empty); 40 } 41 42 MultiProducerSingleConsumerQueue::Node* PopAndCheckEnd(bool * empty)43MultiProducerSingleConsumerQueue::PopAndCheckEnd(bool* empty) { 44 Node* tail = tail_; 45 Node* next = tail_->next.Load(MemoryOrder::ACQUIRE); 46 if (tail == &stub_) { 47 // indicates the list is actually (ephemerally) empty 48 if (next == nullptr) { 49 *empty = true; 50 return nullptr; 51 } 52 tail_ = next; 53 tail = next; 54 next = tail->next.Load(MemoryOrder::ACQUIRE); 55 } 56 if (next != nullptr) { 57 *empty = false; 58 tail_ = next; 59 return tail; 60 } 61 Node* head = head_.Load(MemoryOrder::ACQUIRE); 62 if (tail != head) { 63 *empty = false; 64 // indicates a retry is in order: we're still adding 65 return nullptr; 66 } 67 Push(&stub_); 68 next = tail->next.Load(MemoryOrder::ACQUIRE); 69 if (next != nullptr) { 70 *empty = false; 71 tail_ = next; 72 return tail; 73 } 74 // indicates a retry is in order: we're still adding 75 *empty = false; 76 return nullptr; 77 } 78 79 // 80 // LockedMultiProducerSingleConsumerQueue 81 // 82 Push(Node * node)83bool LockedMultiProducerSingleConsumerQueue::Push(Node* node) { 84 return queue_.Push(node); 85 } 86 87 LockedMultiProducerSingleConsumerQueue::Node* TryPop()88LockedMultiProducerSingleConsumerQueue::TryPop() { 89 if (gpr_mu_trylock(mu_.get())) { 90 Node* node = queue_.Pop(); 91 gpr_mu_unlock(mu_.get()); 92 return node; 93 } 94 return nullptr; 95 } 96 97 LockedMultiProducerSingleConsumerQueue::Node* Pop()98LockedMultiProducerSingleConsumerQueue::Pop() { 99 MutexLock lock(&mu_); 100 bool empty = false; 101 Node* node; 102 do { 103 node = queue_.PopAndCheckEnd(&empty); 104 } while (node == nullptr && !empty); 105 return node; 106 } 107 108 } // namespace grpc_core 109