1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/activity.h"
16
17 #include <stdlib.h>
18
19 #include <functional>
20 #include <tuple>
21 #include <variant>
22
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 #include "src/core/lib/promise/join.h"
26 #include "src/core/lib/promise/poll.h"
27 #include "src/core/lib/promise/promise.h"
28 #include "src/core/lib/promise/seq.h"
29 #include "src/core/lib/promise/wait_set.h"
30 #include "test/core/promise/test_wakeup_schedulers.h"
31
32 using testing::_;
33 using testing::Mock;
34 using testing::MockFunction;
35 using testing::SaveArg;
36 using testing::StrictMock;
37
38 namespace grpc_core {
39
40 // A simple Barrier type: stalls progress until it is 'cleared'.
41 class Barrier {
42 public:
43 struct Result {};
44
Wait()45 Promise<Result> Wait() {
46 return [this]() -> Poll<Result> {
47 MutexLock lock(&mu_);
48 if (cleared_) {
49 return Result{};
50 } else {
51 return wait_set_.AddPending(GetContext<Activity>()->MakeOwningWaker());
52 }
53 };
54 }
55
Clear()56 void Clear() {
57 mu_.Lock();
58 cleared_ = true;
59 auto wakeup = wait_set_.TakeWakeupSet();
60 mu_.Unlock();
61 wakeup.Wakeup();
62 }
63
64 private:
65 Mutex mu_;
66 WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
67 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
68 };
69
70 // A simple Barrier type: stalls progress until it is 'cleared'.
71 // This variant supports only a single waiter.
72 class SingleBarrier {
73 public:
74 struct Result {};
75
Wait()76 Promise<Result> Wait() {
77 return [this]() -> Poll<Result> {
78 MutexLock lock(&mu_);
79 if (cleared_) {
80 return Result{};
81 } else {
82 waker_ = GetContext<Activity>()->MakeOwningWaker();
83 return Pending();
84 }
85 };
86 }
87
Clear()88 void Clear() {
89 mu_.Lock();
90 cleared_ = true;
91 auto waker = std::move(waker_);
92 mu_.Unlock();
93 waker.Wakeup();
94 }
95
96 private:
97 Mutex mu_;
98 Waker waker_ ABSL_GUARDED_BY(mu_);
99 bool cleared_ ABSL_GUARDED_BY(mu_) = false;
100 };
101
TEST(ActivityTest,ImmediatelyCompleteWithSuccess)102 TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
103 StrictMock<MockFunction<void(absl::Status)>> on_done;
104 EXPECT_CALL(on_done, Call(absl::OkStatus()));
105 MakeActivity(
106 [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
107 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
108 }
109
TEST(ActivityTest,ImmediatelyCompleteWithFailure)110 TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
111 StrictMock<MockFunction<void(absl::Status)>> on_done;
112 EXPECT_CALL(on_done, Call(absl::CancelledError()));
113 MakeActivity(
114 [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
115 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
116 }
117
TEST(ActivityTest,DropImmediately)118 TEST(ActivityTest, DropImmediately) {
119 StrictMock<MockFunction<void(absl::Status)>> on_done;
120 EXPECT_CALL(on_done, Call(absl::CancelledError()));
121 MakeActivity(
122 [] { return []() -> Poll<absl::Status> { return Pending(); }; },
123 NoWakeupScheduler(),
124 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
125 }
126
127 template <typename B>
128 class BarrierTest : public ::testing::Test {
129 public:
130 using Type = B;
131 };
132
133 using BarrierTestTypes = ::testing::Types<Barrier, SingleBarrier>;
134 TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
135
TYPED_TEST(BarrierTest,Barrier)136 TYPED_TEST(BarrierTest, Barrier) {
137 typename TestFixture::Type b;
138 StrictMock<MockFunction<void(absl::Status)>> on_done;
139 auto activity = MakeActivity(
140 [&b] {
141 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
142 return absl::OkStatus();
143 });
144 },
145 InlineWakeupScheduler(),
146 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
147 // Clearing the barrier should let the activity proceed to return a result.
148 EXPECT_CALL(on_done, Call(absl::OkStatus()));
149 b.Clear();
150 }
151
TYPED_TEST(BarrierTest,BarrierPing)152 TYPED_TEST(BarrierTest, BarrierPing) {
153 typename TestFixture::Type b1;
154 typename TestFixture::Type b2;
155 StrictMock<MockFunction<void(absl::Status)>> on_done1;
156 StrictMock<MockFunction<void(absl::Status)>> on_done2;
157 MockCallbackScheduler scheduler1;
158 MockCallbackScheduler scheduler2;
159 auto activity1 = MakeActivity(
160 [&b1, &b2] {
161 return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
162 // Clear the barrier whilst executing an activity
163 b2.Clear();
164 return absl::OkStatus();
165 });
166 },
167 UseMockCallbackScheduler{&scheduler1},
168 [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
169 auto activity2 = MakeActivity(
170 [&b2] {
171 return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
172 return absl::OkStatus();
173 });
174 },
175 UseMockCallbackScheduler{&scheduler2},
176 [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
177 // Since barrier triggers inside activity1 promise, activity2 wakeup will be
178 // scheduled from a callback.
179 std::function<void()> cb1;
180 std::function<void()> cb2;
181 EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
182 b1.Clear();
183 Mock::VerifyAndClearExpectations(&scheduler1);
184 EXPECT_CALL(on_done1, Call(absl::OkStatus()));
185 EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
186 cb1();
187 Mock::VerifyAndClearExpectations(&on_done1);
188 EXPECT_CALL(on_done2, Call(absl::OkStatus()));
189 cb2();
190 }
191
TYPED_TEST(BarrierTest,WakeSelf)192 TYPED_TEST(BarrierTest, WakeSelf) {
193 typename TestFixture::Type b;
194 StrictMock<MockFunction<void(absl::Status)>> on_done;
195 EXPECT_CALL(on_done, Call(absl::OkStatus()));
196 MakeActivity(
197 [&b] {
198 return Seq(Join(b.Wait(),
199 [&b] {
200 b.Clear();
201 return 1;
202 }),
203 [](std::tuple<typename TestFixture::Type::Result, int>) {
204 return absl::OkStatus();
205 });
206 },
207 NoWakeupScheduler(),
208 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
209 }
210
TYPED_TEST(BarrierTest,WakeAfterDestruction)211 TYPED_TEST(BarrierTest, WakeAfterDestruction) {
212 typename TestFixture::Type b;
213 {
214 StrictMock<MockFunction<void(absl::Status)>> on_done;
215 EXPECT_CALL(on_done, Call(absl::CancelledError()));
216 MakeActivity(
217 [&b] {
218 return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
219 return absl::OkStatus();
220 });
221 },
222 InlineWakeupScheduler(),
223 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
224 }
225 b.Clear();
226 }
227
TEST(ActivityTest,ForceWakeup)228 TEST(ActivityTest, ForceWakeup) {
229 StrictMock<MockFunction<void(absl::Status)>> on_done;
230 int run_count = 0;
231 auto activity = MakeActivity(
232 [&run_count]() -> Poll<absl::Status> {
233 ++run_count;
234 switch (run_count) {
235 case 1:
236 return Pending{};
237 case 2:
238 return absl::OkStatus();
239 default:
240 abort();
241 }
242 },
243 InlineWakeupScheduler(),
244 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
245 EXPECT_CALL(on_done, Call(absl::OkStatus()));
246 activity->ForceWakeup();
247 }
248
249 struct TestContext {
250 bool* done;
251 };
252 template <>
253 struct ContextType<TestContext> {};
254
TEST(ActivityTest,WithContext)255 TEST(ActivityTest, WithContext) {
256 bool done = false;
257 StrictMock<MockFunction<void(absl::Status)>> on_done;
258 EXPECT_CALL(on_done, Call(absl::OkStatus()));
259 MakeActivity(
260 [] {
261 *GetContext<TestContext>()->done = true;
262 return Immediate(absl::OkStatus());
263 },
264 NoWakeupScheduler(),
265 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
266 TestContext{&done});
267 EXPECT_TRUE(done);
268 }
269
TEST(ActivityTest,CanCancelDuringExecution)270 TEST(ActivityTest, CanCancelDuringExecution) {
271 ActivityPtr activity;
272 StrictMock<MockFunction<void(absl::Status)>> on_done;
273 int run_count = 0;
274
275 activity = MakeActivity(
276 [&activity, &run_count]() -> Poll<absl::Status> {
277 ++run_count;
278 switch (run_count) {
279 case 1:
280 return Pending{};
281 case 2:
282 activity.reset();
283 return Pending{};
284 default:
285 abort();
286 }
287 },
288 InlineWakeupScheduler(),
289 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
290
291 EXPECT_CALL(on_done, Call(absl::CancelledError()));
292 activity->ForceWakeup();
293 }
294
TEST(ActivityTest,CanCancelDuringSuccessfulExecution)295 TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
296 ActivityPtr activity;
297 StrictMock<MockFunction<void(absl::Status)>> on_done;
298 int run_count = 0;
299
300 activity = MakeActivity(
301 [&activity, &run_count]() -> Poll<absl::Status> {
302 ++run_count;
303 switch (run_count) {
304 case 1:
305 return Pending{};
306 case 2:
307 activity.reset();
308 return absl::OkStatus();
309 default:
310 abort();
311 }
312 },
313 InlineWakeupScheduler(),
314 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
315
316 EXPECT_CALL(on_done, Call(absl::OkStatus()));
317 activity->ForceWakeup();
318 }
319
TEST(WakerTest,CanWakeupEmptyWaker)320 TEST(WakerTest, CanWakeupEmptyWaker) {
321 // Empty wakers should not do anything upon wakeup.
322 Waker().Wakeup();
323 }
324
325 } // namespace grpc_core
326
main(int argc,char ** argv)327 int main(int argc, char** argv) {
328 ::testing::InitGoogleTest(&argc, argv);
329 return RUN_ALL_TESTS();
330 }
331