1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_ 18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_ 19 20 #include <mutex> 21 #include <optional> 22 23 #include "perfetto/base/compiler.h" 24 #include "perfetto/base/platform_handle.h" 25 #include "perfetto/ext/base/circular_queue.h" 26 #include "perfetto/ext/base/event_fd.h" 27 28 namespace perfetto { 29 namespace base { 30 31 // Unidirectional conduit used to send values between threads with a fixed-sized 32 // buffer in-between. 33 // 34 // When a channel is read from when empty or written to when full, the operation 35 // will not succeed and the caller can choose to a) abandon the operation, 36 // or b) use |read_fd| or |write_fd| (as appropriate) which will be become 37 // "ready" (i.e. base::TaskRunner watches will fire) when the operation would 38 // succeed. 39 // 40 // A channel is very similar to a Unix pipe except with the values being sent 41 // a) not needing to be serializable b) data does not go through the kernel. 42 template <typename T> 43 class Channel { 44 public: 45 struct ReadResult { ReadResultReadResult46 ReadResult(std::optional<T> _item, bool _is_closed) 47 : item(std::move(_item)), is_closed(_is_closed) {} 48 49 bool operator==(const ReadResult& res) const { 50 return item == res.item && is_closed == res.is_closed; 51 } 52 53 // The item read from the channel or std::nullopt if the channel is empty. 54 // If so, callers can use |read_fd| to be notified when a read operation 55 // would succeed. 56 std::optional<T> item; 57 58 // Indicates the channel is closed. Readers can continue to read from the 59 // channel and any buffered elements will be correctly returned. Moreover, 60 // any future reads will also have |is_closed| == true and |read_fd| will be 61 // ready forever. 62 // 63 // Once a ReadResult is returned with |item| == std::nullopt and 64 // |is_closed| == true, no further values will ever be returned. 65 bool is_closed; 66 }; 67 struct WriteResult { WriteResultWriteResult68 WriteResult(bool _success, bool _is_closed) 69 : success(std::move(_success)), is_closed(_is_closed) {} 70 71 bool operator==(const WriteResult& res) const { 72 return success == res.success && is_closed == res.is_closed; 73 } 74 75 // Returns whether the write to the channel was successful. If this is 76 // false, callers can use |write_fd| to be notified when future writes 77 // would succeed. Note that callers should also check |is_closed| as another 78 // writer may have closed the channel. 79 bool success; 80 81 // Indicates that the channel is closed. If this value is true, |success| 82 // will be |false| Moreover, any further writes will continue to return 83 // |success| == false, |is_closed| == true and |write_fd| will be ready 84 // forever. 85 bool is_closed; 86 }; 87 88 // Creates a channel with a capacity at least as large as |capacity_hint|. The 89 // capacity *must* be greater than zero. 90 // 91 // Note that it's possible that a capacity > |capacity_hint| will be chosen: 92 // it is implementation defined when this might happen. Channel(uint32_t capacity_hint)93 explicit Channel(uint32_t capacity_hint) : elements_(capacity_hint) { 94 PERFETTO_DCHECK(capacity_hint > 0); 95 96 // It's very important that we make sure |write_fd| is ready to avoid 97 // deadlocks. 98 write_fd_.Notify(); 99 } 100 101 // Attempts to read from the channel and returns the result of the attempt. 102 // See |ReadResult| for more information on the result. ReadNonBlocking()103 PERFETTO_WARN_UNUSED_RESULT ReadResult ReadNonBlocking() { 104 std::lock_guard<std::mutex> lock(mutex_); 105 if (elements_.empty()) { 106 return ReadResult(std::nullopt, is_closed_); 107 } 108 if (elements_.capacity() == elements_.size()) { 109 write_fd_.Notify(); 110 } 111 T value = std::move(elements_.front()); 112 elements_.pop_front(); 113 if (!is_closed_ && elements_.empty()) { 114 read_fd_.Clear(); 115 } 116 return ReadResult(std::move(value), is_closed_); 117 } 118 119 // Attempts to write to the channel and returns the result of the attempt. 120 // See |WriteResult| for more information on the result. 121 // 122 // IMPORTANT: if this function returns |success| == false, |element| *will 123 // not* be modified. This allows the caller to try again with the same value. WriteNonBlocking(T && element)124 PERFETTO_WARN_UNUSED_RESULT WriteResult WriteNonBlocking(T&& element) { 125 std::lock_guard<std::mutex> lock(mutex_); 126 if (is_closed_) { 127 return WriteResult{false, true}; 128 } 129 if (elements_.size() == elements_.capacity()) { 130 return WriteResult{false, false}; 131 } 132 if (elements_.empty()) { 133 read_fd_.Notify(); 134 } 135 elements_.emplace_back(std::move(element)); 136 if (elements_.size() == elements_.capacity()) { 137 write_fd_.Clear(); 138 } 139 return WriteResult{true, false}; 140 } 141 142 // Closes the channel for to any further writes. 143 // 144 // Note: this function will make both |read_fd| and |write_fd| ready to 145 // avoid deadlocks. Callers should correctly handle |is_closed| being 146 // false from |ReadNonBlocking| and |WriteNonBlocking| to stop watching the 147 // fds to avoid poll returning immediately. 148 // 149 // We prefer this behaviour as it's a lot more obvious something is wrong when 150 // it spins and takes 100% CPU rather than silently deadlocking. Close()151 void Close() { 152 std::lock_guard<std::mutex> lock(mutex_); 153 is_closed_ = true; 154 155 // Make both fds ready to avoid deadlocks. 156 read_fd_.Notify(); 157 write_fd_.Notify(); 158 } 159 160 // Notification FD for when |ReadNonBlocking| would succeed. Can be useful to 161 // pass to AddFileDescriptorWatch to read data from the channel. read_fd()162 base::PlatformHandle read_fd() const { return read_fd_.fd(); } 163 164 // Notification FD for when |WriteNonBlocking| would succeed. Can be useful to 165 // pass to AddFileDescriptorWatch to send data through the channel. write_fd()166 base::PlatformHandle write_fd() const { return write_fd_.fd(); } 167 168 private: 169 std::mutex mutex_; 170 base::CircularQueue<T> elements_; 171 bool is_closed_ = false; 172 173 base::EventFd read_fd_; 174 base::EventFd write_fd_; 175 }; 176 177 } // namespace base 178 } // namespace perfetto 179 180 #endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_ 181