• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 
21 #include <array>
22 #include <utility>
23 
24 #include "absl/base/thread_annotations.h"
25 #include "absl/types/optional.h"
26 #include "src/core/lib/promise/activity.h"
27 #include "src/core/lib/promise/poll.h"
28 #include "src/core/util/ref_counted.h"
29 #include "src/core/util/ref_counted_ptr.h"
30 #include "src/core/util/sync.h"
31 
32 namespace grpc_core {
33 
34 template <typename T, uint8_t kQueueSize>
35 class InterActivityPipe {
36  public:
37   class NextResult {
38    public:
39     template <typename... Args>
NextResult(Args &&...args)40     explicit NextResult(Args&&... args) : value_(std::forward<Args>(args)...) {}
41     using value_type = T;
reset()42     void reset() { value_.reset(); }
cancelled()43     bool cancelled() const { return false; }
has_value()44     bool has_value() const { return value_.has_value(); }
value()45     const T& value() const { return value_.value(); }
value()46     T& value() { return value_.value(); }
47     const T& operator*() const { return *value_; }
48     T& operator*() { return *value_; }
49 
50    private:
51     absl::optional<T> value_;
52   };
53 
54  private:
55   class Center : public RefCounted<Center, NonPolymorphicRefCount> {
56    public:
Push(T & value)57     Poll<bool> Push(T& value) {
58       ReleasableMutexLock lock(&mu_);
59       if (closed_) return false;
60       if (count_ == kQueueSize) {
61         on_available_ = GetContext<Activity>()->MakeNonOwningWaker();
62         return Pending{};
63       }
64       queue_[(first_ + count_) % kQueueSize] = std::move(value);
65       ++count_;
66       if (count_ == 1) {
67         auto on_occupied = std::move(on_occupied_);
68         lock.Release();
69         on_occupied.Wakeup();
70       }
71       return true;
72     }
73 
Next()74     Poll<NextResult> Next() {
75       ReleasableMutexLock lock(&mu_);
76       if (count_ == 0) {
77         if (closed_) return absl::nullopt;
78         on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker();
79         return Pending{};
80       }
81       auto value = std::move(queue_[first_]);
82       first_ = (first_ + 1) % kQueueSize;
83       --count_;
84       if (count_ == kQueueSize - 1) {
85         auto on_available = std::move(on_available_);
86         lock.Release();
87         on_available.Wakeup();
88       }
89       return std::move(value);
90     }
91 
MarkClosed()92     void MarkClosed() {
93       ReleasableMutexLock lock(&mu_);
94       if (std::exchange(closed_, true)) return;
95       auto on_occupied = std::move(on_occupied_);
96       auto on_available = std::move(on_available_);
97       lock.Release();
98       on_occupied.Wakeup();
99       on_available.Wakeup();
100     }
101 
IsClosed()102     bool IsClosed() {
103       MutexLock lock(&mu_);
104       return closed_;
105     }
106 
107    private:
108     Mutex mu_;
109     std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
110     bool closed_ ABSL_GUARDED_BY(mu_) = false;
111     uint8_t first_ ABSL_GUARDED_BY(mu_) = 0;
112     uint8_t count_ ABSL_GUARDED_BY(mu_) = 0;
113     Waker on_occupied_ ABSL_GUARDED_BY(mu_);
114     Waker on_available_ ABSL_GUARDED_BY(mu_);
115   };
116   RefCountedPtr<Center> center_{MakeRefCounted<Center>()};
117 
118  public:
119   class Sender {
120    public:
Sender(RefCountedPtr<Center> center)121     explicit Sender(RefCountedPtr<Center> center)
122         : center_(std::move(center)) {}
123     Sender(const Sender&) = delete;
124     Sender& operator=(const Sender&) = delete;
125     Sender(Sender&&) noexcept = default;
126     Sender& operator=(Sender&&) noexcept = default;
127 
~Sender()128     ~Sender() {
129       if (center_ != nullptr) center_->MarkClosed();
130     }
131 
IsClosed()132     bool IsClosed() { return center_->IsClosed(); }
133 
MarkClosed()134     void MarkClosed() {
135       if (center_ != nullptr) center_->MarkClosed();
136     }
137 
Push(T value)138     auto Push(T value) {
139       return [center = center_, value = std::move(value)]() mutable {
140         return center->Push(value);
141       };
142     }
143 
144    private:
145     RefCountedPtr<Center> center_;
146   };
147 
148   class Receiver {
149    public:
Receiver(RefCountedPtr<Center> center)150     explicit Receiver(RefCountedPtr<Center> center)
151         : center_(std::move(center)) {}
152     Receiver(const Receiver&) = delete;
153     Receiver& operator=(const Receiver&) = delete;
154     Receiver(Receiver&&) noexcept = default;
155     Receiver& operator=(Receiver&&) noexcept = default;
156 
~Receiver()157     ~Receiver() {
158       if (center_ != nullptr) center_->MarkClosed();
159     }
160 
Next()161     auto Next() {
162       return [center = center_]() { return center->Next(); };
163     }
164 
IsClose()165     bool IsClose() { return center_->IsClosed(); }
166 
MarkClose()167     void MarkClose() {
168       if (center_ != nullptr) center_->MarkClosed();
169     }
170 
171    private:
172     RefCountedPtr<Center> center_;
173   };
174 
175   Sender sender{center_};
176   Receiver receiver{center_};
177 };
178 
179 }  // namespace grpc_core
180 
181 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H
182