1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 18 #define ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 19 20 #include <deque> 21 #include <optional> 22 #include <variant> 23 24 #include "base/time_utils.h" 25 #include "mutex.h" 26 #include "thread.h" 27 28 #pragma clang diagnostic push 29 #pragma clang diagnostic error "-Wconversion" 30 31 namespace art { 32 33 struct TimeoutExpiredMessage {}; 34 35 // MessageQueue is an unbounded multiple producer, multiple consumer (MPMC) queue that can be 36 // specialized to send messages between threads. The queue is parameterized by a set of types that 37 // serve as the message types. Note that messages are passed by value, so smaller messages should be 38 // used when possible. 39 // 40 // Example: 41 // 42 // struct IntMessage { int value; }; 43 // struct DoubleMessage { double value; }; 44 // 45 // MessageQueue<IntMessage, DoubleMessage> queue; 46 // 47 // queue.SendMessage(IntMessage{42}); 48 // queue.SendMessage(DoubleMessage{42.0}); 49 // 50 // auto message = queue.ReceiveMessage(); // message is a std::variant of the different 51 // // message types. 52 // 53 // if (std::holds_alternative<IntMessage>(message)) { 54 // cout << "Received int message with value " << std::get<IntMessage>(message) << "\n"; 55 // } 56 // 57 // The message queue also supports a special timeout message. This is scheduled to be sent by the 58 // SetTimeout method, which will cause the MessageQueue to deliver a TimeoutExpiredMessage after the 59 // time period has elapsed. Note that only one timeout can be active can be active at a time, and 60 // subsequent calls to SetTimeout will overwrite any existing timeout. 61 // 62 // Example: 63 // 64 // queue.SetTimeout(5000); // request to send TimeoutExpiredMessage in 5000ms. 65 // 66 // auto message = queue.ReceiveMessage(); // blocks for 5000ms and returns 67 // // TimeoutExpiredMessage 68 // 69 // Note additional messages can be sent in the meantime and a ReceiveMessage call will wake up to 70 // return that message. The TimeoutExpiredMessage will still be sent at the right time. 71 // 72 // Finally, MessageQueue has a SwitchReceive method that can be used to run different code depending 73 // on the type of message received. SwitchReceive takes a set of lambda expressions that take one 74 // argument of one of the allowed message types. An additional lambda expression that takes a single 75 // auto argument can be used to serve as a catch-all case. 76 // 77 // Example: 78 // 79 // queue.SwitchReceive( 80 // [&](IntMessage message) { 81 // cout << "Received int: " << message.value << "\n"; 82 // }, 83 // [&](DoubleMessage message) { 84 // cout << "Received double: " << message.value << "\n"; 85 // }, 86 // [&](auto other_message) { 87 // // Another message was received. In this case, it's TimeoutExpiredMessage. 88 // } 89 // ) 90 // 91 // For additional examples, see message_queue_test.cc. 92 template <typename... MessageTypes> 93 class MessageQueue { 94 public: 95 using Message = std::variant<TimeoutExpiredMessage, MessageTypes...>; 96 97 // Adds a message to the message queue, which can later be received with ReceiveMessage. See class 98 // comment for more details. SendMessage(Message message)99 void SendMessage(Message message) { 100 // TimeoutExpiredMessage should not be sent manually. 101 DCHECK(!std::holds_alternative<TimeoutExpiredMessage>(message)); 102 Thread* self = Thread::Current(); 103 MutexLock lock{self, mutex_}; 104 messages_.push_back(message); 105 cv_.Signal(self); 106 } 107 108 // Schedule a TimeoutExpiredMessage to be delivered in timeout_milliseconds. See class comment for 109 // more details. SetTimeout(uint64_t timeout_milliseconds)110 void SetTimeout(uint64_t timeout_milliseconds) { 111 Thread* self = Thread::Current(); 112 MutexLock lock{self, mutex_}; 113 deadline_milliseconds_ = timeout_milliseconds + MilliTime(); 114 cv_.Signal(self); 115 } 116 117 // Remove and return a message from the queue. If no message is available, ReceiveMessage will 118 // block until one becomes available. See class comment for more details. ReceiveMessage()119 Message ReceiveMessage() { 120 Thread* self = Thread::Current(); 121 MutexLock lock{self, mutex_}; 122 123 // Loop until we receive a message 124 while (true) { 125 uint64_t const current_time = MilliTime(); 126 // First check if the deadline has passed. 127 if (deadline_milliseconds_.has_value() && deadline_milliseconds_.value() < current_time) { 128 deadline_milliseconds_.reset(); 129 return TimeoutExpiredMessage{}; 130 } 131 132 // Check if there is a message in the queue. 133 if (messages_.size() > 0) { 134 Message message = messages_.front(); 135 messages_.pop_front(); 136 return message; 137 } 138 139 // Otherwise, wait until we have a message or a timeout. 140 if (deadline_milliseconds_.has_value()) { 141 DCHECK_LE(current_time, deadline_milliseconds_.value()); 142 int64_t timeout = static_cast<int64_t>(deadline_milliseconds_.value() - current_time); 143 cv_.TimedWait(self, timeout, /*ns=*/0); 144 } else { 145 cv_.Wait(self); 146 } 147 } 148 } 149 150 // Waits for a message and applies the appropriate function argument to the received message. See 151 // class comment for more details. 152 template <typename ReturnType = void, typename... Fn> SwitchReceive(Fn...case_fn)153 ReturnType SwitchReceive(Fn... case_fn) { 154 struct Matcher : Fn... { 155 using Fn::operator()...; 156 } matcher{case_fn...}; 157 return std::visit(matcher, ReceiveMessage()); 158 } 159 160 private: 161 Mutex mutex_{"MessageQueue Mutex"}; 162 ConditionVariable cv_{"MessageQueue ConditionVariable", mutex_}; 163 164 std::deque<Message> messages_ GUARDED_BY(mutex_); 165 std::optional<uint64_t> deadline_milliseconds_ GUARDED_BY(mutex_); 166 }; 167 168 } // namespace art 169 170 #pragma clang diagnostic pop // -Wconversion 171 172 #endif // ART_RUNTIME_BASE_MESSAGE_QUEUE_H_ 173