• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/activity.h"
16 
17 #include <stdlib.h>
18 
19 #include <functional>
20 #include <tuple>
21 #include <variant>
22 
23 #include "gmock/gmock.h"
24 #include "gtest/gtest.h"
25 #include "src/core/lib/promise/join.h"
26 #include "src/core/lib/promise/poll.h"
27 #include "src/core/lib/promise/promise.h"
28 #include "src/core/lib/promise/seq.h"
29 #include "src/core/lib/promise/wait_set.h"
30 #include "test/core/promise/test_wakeup_schedulers.h"
31 
32 using testing::_;
33 using testing::Mock;
34 using testing::MockFunction;
35 using testing::SaveArg;
36 using testing::StrictMock;
37 
38 namespace grpc_core {
39 
40 // A simple Barrier type: stalls progress until it is 'cleared'.
41 class Barrier {
42  public:
43   struct Result {};
44 
Wait()45   Promise<Result> Wait() {
46     return [this]() -> Poll<Result> {
47       MutexLock lock(&mu_);
48       if (cleared_) {
49         return Result{};
50       } else {
51         return wait_set_.AddPending(GetContext<Activity>()->MakeOwningWaker());
52       }
53     };
54   }
55 
Clear()56   void Clear() {
57     mu_.Lock();
58     cleared_ = true;
59     auto wakeup = wait_set_.TakeWakeupSet();
60     mu_.Unlock();
61     wakeup.Wakeup();
62   }
63 
64  private:
65   Mutex mu_;
66   WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
67   bool cleared_ ABSL_GUARDED_BY(mu_) = false;
68 };
69 
70 // A simple Barrier type: stalls progress until it is 'cleared'.
71 // This variant supports only a single waiter.
72 class SingleBarrier {
73  public:
74   struct Result {};
75 
Wait()76   Promise<Result> Wait() {
77     return [this]() -> Poll<Result> {
78       MutexLock lock(&mu_);
79       if (cleared_) {
80         return Result{};
81       } else {
82         waker_ = GetContext<Activity>()->MakeOwningWaker();
83         return Pending();
84       }
85     };
86   }
87 
Clear()88   void Clear() {
89     mu_.Lock();
90     cleared_ = true;
91     auto waker = std::move(waker_);
92     mu_.Unlock();
93     waker.Wakeup();
94   }
95 
96  private:
97   Mutex mu_;
98   Waker waker_ ABSL_GUARDED_BY(mu_);
99   bool cleared_ ABSL_GUARDED_BY(mu_) = false;
100 };
101 
TEST(ActivityTest,ImmediatelyCompleteWithSuccess)102 TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
103   StrictMock<MockFunction<void(absl::Status)>> on_done;
104   EXPECT_CALL(on_done, Call(absl::OkStatus()));
105   MakeActivity(
106       [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
107       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
108 }
109 
TEST(ActivityTest,ImmediatelyCompleteWithFailure)110 TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
111   StrictMock<MockFunction<void(absl::Status)>> on_done;
112   EXPECT_CALL(on_done, Call(absl::CancelledError()));
113   MakeActivity(
114       [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
115       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
116 }
117 
TEST(ActivityTest,DropImmediately)118 TEST(ActivityTest, DropImmediately) {
119   StrictMock<MockFunction<void(absl::Status)>> on_done;
120   EXPECT_CALL(on_done, Call(absl::CancelledError()));
121   MakeActivity(
122       [] { return []() -> Poll<absl::Status> { return Pending(); }; },
123       NoWakeupScheduler(),
124       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
125 }
126 
127 template <typename B>
128 class BarrierTest : public ::testing::Test {
129  public:
130   using Type = B;
131 };
132 
133 using BarrierTestTypes = ::testing::Types<Barrier, SingleBarrier>;
134 TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
135 
TYPED_TEST(BarrierTest,Barrier)136 TYPED_TEST(BarrierTest, Barrier) {
137   typename TestFixture::Type b;
138   StrictMock<MockFunction<void(absl::Status)>> on_done;
139   auto activity = MakeActivity(
140       [&b] {
141         return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
142           return absl::OkStatus();
143         });
144       },
145       InlineWakeupScheduler(),
146       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
147   // Clearing the barrier should let the activity proceed to return a result.
148   EXPECT_CALL(on_done, Call(absl::OkStatus()));
149   b.Clear();
150 }
151 
TYPED_TEST(BarrierTest,BarrierPing)152 TYPED_TEST(BarrierTest, BarrierPing) {
153   typename TestFixture::Type b1;
154   typename TestFixture::Type b2;
155   StrictMock<MockFunction<void(absl::Status)>> on_done1;
156   StrictMock<MockFunction<void(absl::Status)>> on_done2;
157   MockCallbackScheduler scheduler1;
158   MockCallbackScheduler scheduler2;
159   auto activity1 = MakeActivity(
160       [&b1, &b2] {
161         return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
162           // Clear the barrier whilst executing an activity
163           b2.Clear();
164           return absl::OkStatus();
165         });
166       },
167       UseMockCallbackScheduler{&scheduler1},
168       [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
169   auto activity2 = MakeActivity(
170       [&b2] {
171         return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
172           return absl::OkStatus();
173         });
174       },
175       UseMockCallbackScheduler{&scheduler2},
176       [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
177   // Since barrier triggers inside activity1 promise, activity2 wakeup will be
178   // scheduled from a callback.
179   std::function<void()> cb1;
180   std::function<void()> cb2;
181   EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
182   b1.Clear();
183   Mock::VerifyAndClearExpectations(&scheduler1);
184   EXPECT_CALL(on_done1, Call(absl::OkStatus()));
185   EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
186   cb1();
187   Mock::VerifyAndClearExpectations(&on_done1);
188   EXPECT_CALL(on_done2, Call(absl::OkStatus()));
189   cb2();
190 }
191 
TYPED_TEST(BarrierTest,WakeSelf)192 TYPED_TEST(BarrierTest, WakeSelf) {
193   typename TestFixture::Type b;
194   StrictMock<MockFunction<void(absl::Status)>> on_done;
195   EXPECT_CALL(on_done, Call(absl::OkStatus()));
196   MakeActivity(
197       [&b] {
198         return Seq(Join(b.Wait(),
199                         [&b] {
200                           b.Clear();
201                           return 1;
202                         }),
203                    [](std::tuple<typename TestFixture::Type::Result, int>) {
204                      return absl::OkStatus();
205                    });
206       },
207       NoWakeupScheduler(),
208       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
209 }
210 
TYPED_TEST(BarrierTest,WakeAfterDestruction)211 TYPED_TEST(BarrierTest, WakeAfterDestruction) {
212   typename TestFixture::Type b;
213   {
214     StrictMock<MockFunction<void(absl::Status)>> on_done;
215     EXPECT_CALL(on_done, Call(absl::CancelledError()));
216     MakeActivity(
217         [&b] {
218           return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
219             return absl::OkStatus();
220           });
221         },
222         InlineWakeupScheduler(),
223         [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
224   }
225   b.Clear();
226 }
227 
TEST(ActivityTest,ForceWakeup)228 TEST(ActivityTest, ForceWakeup) {
229   StrictMock<MockFunction<void(absl::Status)>> on_done;
230   int run_count = 0;
231   auto activity = MakeActivity(
232       [&run_count]() -> Poll<absl::Status> {
233         ++run_count;
234         switch (run_count) {
235           case 1:
236             return Pending{};
237           case 2:
238             return absl::OkStatus();
239           default:
240             abort();
241         }
242       },
243       InlineWakeupScheduler(),
244       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
245   EXPECT_CALL(on_done, Call(absl::OkStatus()));
246   activity->ForceWakeup();
247 }
248 
249 struct TestContext {
250   bool* done;
251 };
252 template <>
253 struct ContextType<TestContext> {};
254 
TEST(ActivityTest,WithContext)255 TEST(ActivityTest, WithContext) {
256   bool done = false;
257   StrictMock<MockFunction<void(absl::Status)>> on_done;
258   EXPECT_CALL(on_done, Call(absl::OkStatus()));
259   MakeActivity(
260       [] {
261         *GetContext<TestContext>()->done = true;
262         return Immediate(absl::OkStatus());
263       },
264       NoWakeupScheduler(),
265       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
266       TestContext{&done});
267   EXPECT_TRUE(done);
268 }
269 
TEST(ActivityTest,CanCancelDuringExecution)270 TEST(ActivityTest, CanCancelDuringExecution) {
271   ActivityPtr activity;
272   StrictMock<MockFunction<void(absl::Status)>> on_done;
273   int run_count = 0;
274 
275   activity = MakeActivity(
276       [&activity, &run_count]() -> Poll<absl::Status> {
277         ++run_count;
278         switch (run_count) {
279           case 1:
280             return Pending{};
281           case 2:
282             activity.reset();
283             return Pending{};
284           default:
285             abort();
286         }
287       },
288       InlineWakeupScheduler(),
289       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
290 
291   EXPECT_CALL(on_done, Call(absl::CancelledError()));
292   activity->ForceWakeup();
293 }
294 
TEST(ActivityTest,CanCancelDuringSuccessfulExecution)295 TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
296   ActivityPtr activity;
297   StrictMock<MockFunction<void(absl::Status)>> on_done;
298   int run_count = 0;
299 
300   activity = MakeActivity(
301       [&activity, &run_count]() -> Poll<absl::Status> {
302         ++run_count;
303         switch (run_count) {
304           case 1:
305             return Pending{};
306           case 2:
307             activity.reset();
308             return absl::OkStatus();
309           default:
310             abort();
311         }
312       },
313       InlineWakeupScheduler(),
314       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
315 
316   EXPECT_CALL(on_done, Call(absl::OkStatus()));
317   activity->ForceWakeup();
318 }
319 
TEST(WakerTest,CanWakeupEmptyWaker)320 TEST(WakerTest, CanWakeupEmptyWaker) {
321   // Empty wakers should not do anything upon wakeup.
322   Waker().Wakeup();
323 }
324 
325 }  // namespace grpc_core
326 
main(int argc,char ** argv)327 int main(int argc, char** argv) {
328   ::testing::InitGoogleTest(&argc, argv);
329   return RUN_ALL_TESTS();
330 }
331