1 // Copyright 2014 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include "android/base/Optional.h" 18 #include "android/base/synchronization/AndroidConditionVariable.h" 19 #include "android/base/synchronization/AndroidLock.h" 20 21 #include <utility> 22 #include <stddef.h> 23 24 namespace android { 25 namespace base { 26 namespace guest { 27 28 // Base non-templated class used to reduce the amount of template 29 // specialization. 30 class MessageChannelBase { 31 public: 32 // Get the current channel size 33 size_t size() const; 34 35 // Abort the currently pending operations and don't allow any other ones 36 void stop(); 37 38 // Check if the channel is stopped. 39 bool isStopped() const; 40 41 // Block until the channel has no pending messages. 42 void waitForEmpty(); 43 44 protected: 45 // Constructor. |capacity| is the buffer capacity in messages. 46 MessageChannelBase(size_t capacity); 47 48 // Destructor. 49 ~MessageChannelBase() = default; 50 51 // Call this method in the sender thread before writing a new message. 52 // This returns the position of the available slot in the message array 53 // where to copy the new fixed-size message. After the copy, call 54 // afterWrite(). 55 // If the channel is stopped, return value is undefined. 56 size_t beforeWrite(); 57 58 // Same as beforeWrite(), but returns an empty optional if there was 59 // no room to write to instead of waiting for it. 60 // One still needs to call afterWrite() anyway. 61 Optional<size_t> beforeTryWrite(); 62 63 // To be called after trying to write a new fixed-size message (which should 64 // happen after beforeWrite() or beforeTryWrite()). 65 // |success| must be true to indicate that a new item was added to the 66 // channel, or false otherwise (i.e. if the channel is stopped, or if 67 // beforeTryWrite() returned an empty optional). 68 void afterWrite(bool success); 69 70 // Call this method in the receiver thread before reading a new message. 71 // This returns the position in the message array where the new message 72 // can be read. Caller must process the message, then call afterRead(). 73 // If the channel is stopped, return value is undefined. 74 size_t beforeRead(); 75 76 // Same as beforeRead(), but returns an empty optional if there was 77 // no data to read instead of waiting for it. 78 // One still needs to call afterWrite() anyway. 79 Optional<size_t> beforeTryRead(); 80 81 // Same as beforeRead(), but returns an empty optional if no data arrived 82 // by the |wallTimeUs| absolute time. One still needs to call 83 // afterWrite() anyway. 84 Optional<size_t> beforeTimedRead(uint64_t wallTimeUs); 85 86 // To be called after reading a fixed-size message from the channel (which 87 // must happen after beforeRead() or beforeTryRead()). 88 // |success| must be true to indicate that a message was read, or false 89 // otherwise (i.e. if the channel is stopped or if beforeTryRead() returned 90 // an empty optional). 91 void afterRead(bool success); 92 93 // A version of isStopped() that doesn't lock the channel but expects it 94 // to be locked by the caller. isStoppedLocked()95 bool isStoppedLocked() const { return mStopped; } 96 97 private: 98 size_t mPos = 0; 99 size_t mCapacity; 100 size_t mCount = 0; 101 bool mStopped = false; 102 mutable Lock mLock; // Mutable to allow const members to lock it. 103 ConditionVariable mCanRead; 104 ConditionVariable mCanWrite; 105 }; 106 107 // Helper class used to implement an uni-directional IPC channel between 108 // two threads. The channel can be used to send fixed-size messages of type 109 // |T|, with an internal buffer size of |CAPACITY| items. All calls are 110 // blocking. 111 // 112 // Usage is pretty straightforward: 113 // 114 // - From the sender thread, call send(msg); 115 // - From the receiver thread, call receive(&msg); 116 // - If you want to stop the IPC, call stop(); 117 template <typename T, size_t CAPACITY> 118 class MessageChannel : public MessageChannelBase { 119 public: MessageChannel()120 MessageChannel() : MessageChannelBase(CAPACITY) {} 121 send(const T & msg)122 bool send(const T& msg) { 123 const size_t pos = beforeWrite(); 124 const bool res = !isStoppedLocked(); 125 if (res) { 126 mItems[pos] = msg; 127 } 128 afterWrite(res); 129 return res; 130 } 131 send(T && msg)132 bool send(T&& msg) { 133 const size_t pos = beforeWrite(); 134 const bool res = !isStoppedLocked(); 135 if (res) { 136 mItems[pos] = std::move(msg); 137 } 138 afterWrite(res); 139 return res; 140 } 141 trySend(const T & msg)142 bool trySend(const T& msg) { 143 const auto pos = beforeTryWrite(); 144 if (pos) { 145 mItems[*pos] = msg; 146 } 147 afterWrite(pos); 148 return pos; 149 } 150 trySend(T && msg)151 bool trySend(T&& msg) { 152 const auto pos = beforeTryWrite(); 153 if (pos) { 154 mItems[*pos] = std::move(msg); 155 } 156 afterWrite(pos); 157 return pos; 158 } 159 receive(T * msg)160 bool receive(T* msg) { 161 const size_t pos = beforeRead(); 162 const bool res = !isStoppedLocked(); 163 if (res) { 164 *msg = std::move(mItems[pos]); 165 } 166 afterRead(res); 167 return res; 168 } 169 receive()170 Optional<T> receive() { 171 const size_t pos = beforeRead(); 172 if (!isStoppedLocked()) { 173 Optional<T> msg(std::move(mItems[pos])); 174 afterRead(true); 175 return msg; 176 } else { 177 afterRead(false); 178 return {}; 179 } 180 } 181 tryReceive(T * msg)182 bool tryReceive(T* msg) { 183 const auto pos = beforeTryRead(); 184 if (pos) { 185 *msg = std::move(mItems[*pos]); 186 } 187 afterRead(pos); 188 return pos; 189 } 190 timedReceive(uint64_t wallTimeUs)191 Optional<T> timedReceive(uint64_t wallTimeUs) { 192 const auto pos = beforeTimedRead(wallTimeUs); 193 if (pos && !isStoppedLocked()) { 194 Optional<T> res(std::move(mItems[*pos])); 195 afterRead(true); 196 return res; 197 } 198 afterRead(false); 199 return {}; 200 } 201 capacity()202 constexpr size_t capacity() const { return CAPACITY; } 203 204 private: 205 T mItems[CAPACITY]; 206 }; 207 208 } // namespace guest 209 } // namespace base 210 } // namespace android 211