• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_
18 #define INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_
19 
20 #include <mutex>
21 #include <optional>
22 
23 #include "perfetto/base/compiler.h"
24 #include "perfetto/base/platform_handle.h"
25 #include "perfetto/ext/base/circular_queue.h"
26 #include "perfetto/ext/base/event_fd.h"
27 
28 namespace perfetto {
29 namespace base {
30 
31 // Unidirectional conduit used to send values between threads with a fixed-sized
32 // buffer in-between.
33 //
34 // When a channel is read from when empty or written to when full, the operation
35 // will not succeed and the caller can choose to a) abandon the operation,
36 // or b) use |read_fd| or |write_fd| (as appropriate) which will be become
37 // "ready" (i.e. base::TaskRunner watches will fire) when the operation would
38 // succeed.
39 //
40 // A channel is very similar to a Unix pipe except with the values being sent
41 // a) not needing to be serializable b) data does not go through the kernel.
42 template <typename T>
43 class Channel {
44  public:
45   struct ReadResult {
ReadResultReadResult46     ReadResult(std::optional<T> _item, bool _is_closed)
47         : item(std::move(_item)), is_closed(_is_closed) {}
48 
49     bool operator==(const ReadResult& res) const {
50       return item == res.item && is_closed == res.is_closed;
51     }
52 
53     // The item read from the channel or std::nullopt if the channel is empty.
54     // If so, callers can use |read_fd| to be notified when a read operation
55     // would succeed.
56     std::optional<T> item;
57 
58     // Indicates the channel is closed. Readers can continue to read from the
59     // channel and any buffered elements will be correctly returned. Moreover,
60     // any future reads will also have |is_closed| == true and |read_fd| will be
61     // ready forever.
62     //
63     // Once a ReadResult is returned with |item| == std::nullopt and
64     // |is_closed| == true, no further values will ever be returned.
65     bool is_closed;
66   };
67   struct WriteResult {
WriteResultWriteResult68     WriteResult(bool _success, bool _is_closed)
69         : success(std::move(_success)), is_closed(_is_closed) {}
70 
71     bool operator==(const WriteResult& res) const {
72       return success == res.success && is_closed == res.is_closed;
73     }
74 
75     // Returns whether the write to the channel was successful. If this is
76     // false, callers can use |write_fd| to be notified when future writes
77     // would succeed. Note that callers should also check |is_closed| as another
78     // writer may have closed the channel.
79     bool success;
80 
81     // Indicates that the channel is closed. If this value is true, |success|
82     // will be |false| Moreover, any further writes will continue to return
83     // |success| == false, |is_closed| == true and |write_fd| will be ready
84     // forever.
85     bool is_closed;
86   };
87 
88   // Creates a channel with a capacity at least as large as |capacity_hint|. The
89   // capacity *must* be greater than zero.
90   //
91   // Note that it's possible that a capacity > |capacity_hint| will be chosen:
92   // it is implementation defined when this might happen.
Channel(uint32_t capacity_hint)93   explicit Channel(uint32_t capacity_hint) : elements_(capacity_hint) {
94     PERFETTO_DCHECK(capacity_hint > 0);
95 
96     // It's very important that we make sure |write_fd| is ready to avoid
97     // deadlocks.
98     write_fd_.Notify();
99   }
100 
101   // Attempts to read from the channel and returns the result of the attempt.
102   // See |ReadResult| for more information on the result.
ReadNonBlocking()103   PERFETTO_WARN_UNUSED_RESULT ReadResult ReadNonBlocking() {
104     std::lock_guard<std::mutex> lock(mutex_);
105     if (elements_.empty()) {
106       return ReadResult(std::nullopt, is_closed_);
107     }
108     if (elements_.capacity() == elements_.size()) {
109       write_fd_.Notify();
110     }
111     T value = std::move(elements_.front());
112     elements_.pop_front();
113     if (!is_closed_ && elements_.empty()) {
114       read_fd_.Clear();
115     }
116     return ReadResult(std::move(value), is_closed_);
117   }
118 
119   // Attempts to write to the channel and returns the result of the attempt.
120   // See |WriteResult| for more information on the result.
121   //
122   // IMPORTANT: if this function returns |success| == false, |element| *will
123   // not* be modified. This allows the caller to try again with the same value.
WriteNonBlocking(T && element)124   PERFETTO_WARN_UNUSED_RESULT WriteResult WriteNonBlocking(T&& element) {
125     std::lock_guard<std::mutex> lock(mutex_);
126     if (is_closed_) {
127       return WriteResult{false, true};
128     }
129     if (elements_.size() == elements_.capacity()) {
130       return WriteResult{false, false};
131     }
132     if (elements_.empty()) {
133       read_fd_.Notify();
134     }
135     elements_.emplace_back(std::move(element));
136     if (elements_.size() == elements_.capacity()) {
137       write_fd_.Clear();
138     }
139     return WriteResult{true, false};
140   }
141 
142   // Closes the channel for to any further writes.
143   //
144   // Note: this function will make both |read_fd| and |write_fd| ready to
145   // avoid deadlocks. Callers should correctly handle |is_closed| being
146   // false from |ReadNonBlocking| and |WriteNonBlocking| to stop watching the
147   // fds to avoid poll returning immediately.
148   //
149   // We prefer this behaviour as it's a lot more obvious something is wrong when
150   // it spins and takes 100% CPU rather than silently deadlocking.
Close()151   void Close() {
152     std::lock_guard<std::mutex> lock(mutex_);
153     is_closed_ = true;
154 
155     // Make both fds ready to avoid deadlocks.
156     read_fd_.Notify();
157     write_fd_.Notify();
158   }
159 
160   // Notification FD for when |ReadNonBlocking| would succeed. Can be useful to
161   // pass to AddFileDescriptorWatch to read data from the channel.
read_fd()162   base::PlatformHandle read_fd() const { return read_fd_.fd(); }
163 
164   // Notification FD for when |WriteNonBlocking| would succeed. Can be useful to
165   // pass to AddFileDescriptorWatch to send data through the channel.
write_fd()166   base::PlatformHandle write_fd() const { return write_fd_.fd(); }
167 
168  private:
169   std::mutex mutex_;
170   base::CircularQueue<T> elements_;
171   bool is_closed_ = false;
172 
173   base::EventFd read_fd_;
174   base::EventFd write_fd_;
175 };
176 
177 }  // namespace base
178 }  // namespace perfetto
179 
180 #endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_CHANNEL_H_
181