• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/pipe.h"
16 
17 #include <grpc/event_engine/memory_allocator.h>
18 #include <grpc/grpc.h>
19 
20 #include <memory>
21 #include <tuple>
22 #include <utility>
23 
24 #include "absl/functional/function_ref.h"
25 #include "absl/status/status.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/lib/promise/join.h"
30 #include "src/core/lib/promise/map.h"
31 #include "src/core/lib/promise/seq.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "src/core/lib/resource_quota/resource_quota.h"
34 #include "src/core/util/crash.h"
35 #include "src/core/util/ref_counted_ptr.h"
36 #include "test/core/promise/test_wakeup_schedulers.h"
37 
38 using testing::MockFunction;
39 using testing::StrictMock;
40 
41 namespace grpc_core {
42 
TEST(PipeTest,CanSendAndReceive)43 TEST(PipeTest, CanSendAndReceive) {
44   StrictMock<MockFunction<void(absl::Status)>> on_done;
45   EXPECT_CALL(on_done, Call(absl::OkStatus()));
46   MakeActivity(
47       [] {
48         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
49         return Seq(
50             // Concurrently: send 42 into the pipe, and receive from the pipe.
51             Join(pipe->sender.Push(42),
52                  Map(pipe->receiver.Next(),
53                      [](NextResult<int> r) { return r.value(); })),
54             // Once complete, verify successful sending and the received value
55             // is 42.
56             [](std::tuple<bool, int> result) {
57               EXPECT_TRUE(std::get<0>(result));
58               EXPECT_EQ(42, std::get<1>(result));
59               return absl::OkStatus();
60             });
61       },
62       NoWakeupScheduler(),
63       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
64       SimpleArenaAllocator()->MakeArena());
65 }
66 
TEST(PipeTest,CanInterceptAndMapAtSender)67 TEST(PipeTest, CanInterceptAndMapAtSender) {
68   StrictMock<MockFunction<void(absl::Status)>> on_done;
69   EXPECT_CALL(on_done, Call(absl::OkStatus()));
70   MakeActivity(
71       [] {
72         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
73         pipe->sender.InterceptAndMap([](int value) { return value / 2; });
74         return Seq(
75             // Concurrently: send 42 into the pipe, and receive from the pipe.
76             Join(pipe->sender.Push(42),
77                  Map(pipe->receiver.Next(),
78                      [](NextResult<int> r) { return r.value(); })),
79             // Once complete, verify successful sending and the received value
80             // is 21.
81             [](std::tuple<bool, int> result) {
82               EXPECT_TRUE(std::get<0>(result));
83               EXPECT_EQ(21, std::get<1>(result));
84               return absl::OkStatus();
85             });
86       },
87       NoWakeupScheduler(),
88       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
89       SimpleArenaAllocator()->MakeArena());
90 }
91 
TEST(PipeTest,CanInterceptAndMapAtReceiver)92 TEST(PipeTest, CanInterceptAndMapAtReceiver) {
93   StrictMock<MockFunction<void(absl::Status)>> on_done;
94   EXPECT_CALL(on_done, Call(absl::OkStatus()));
95   MakeActivity(
96       [] {
97         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
98         pipe->receiver.InterceptAndMap([](int value) { return value / 2; });
99         return Seq(
100             // Concurrently: send 42 into the pipe, and receive from the pipe.
101             Join(pipe->sender.Push(42),
102                  Map(pipe->receiver.Next(),
103                      [](NextResult<int> r) { return r.value(); })),
104             // Once complete, verify successful sending and the received value
105             // is 21.
106             [](std::tuple<bool, int> result) {
107               EXPECT_TRUE(std::get<0>(result));
108               EXPECT_EQ(21, std::get<1>(result));
109               return absl::OkStatus();
110             });
111       },
112       NoWakeupScheduler(),
113       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
114       SimpleArenaAllocator()->MakeArena());
115 }
116 
TEST(PipeTest,InterceptionOrderingIsCorrect)117 TEST(PipeTest, InterceptionOrderingIsCorrect) {
118   StrictMock<MockFunction<void(absl::Status)>> on_done;
119   EXPECT_CALL(on_done, Call(absl::OkStatus()));
120   MakeActivity(
121       [] {
122         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<std::string>>();
123         auto appender = [](char c) {
124           return [c](std::string value) {
125             value += c;
126             return value;
127           };
128         };
129         // Interception get added outwardly from the center, and run from sender
130         // to receiver, so the following should result in append "abcd".
131         pipe->receiver.InterceptAndMap(appender('c'));
132         pipe->sender.InterceptAndMap(appender('b'));
133         pipe->receiver.InterceptAndMap(appender('d'));
134         pipe->sender.InterceptAndMap(appender('a'));
135         return Seq(
136             // Concurrently: send "" into the pipe, and receive from the pipe.
137             Join(pipe->sender.Push(""),
138                  Map(pipe->receiver.Next(),
139                      [](NextResult<std::string> r) { return r.value(); })),
140             // Once complete, verify successful sending and the received value
141             // is 21.
142             [](std::tuple<bool, std::string> result) {
143               EXPECT_TRUE(std::get<0>(result));
144               EXPECT_EQ("abcd", std::get<1>(result));
145               return absl::OkStatus();
146             });
147       },
148       NoWakeupScheduler(),
149       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
150       SimpleArenaAllocator()->MakeArena());
151 }
152 
TEST(PipeTest,CanReceiveAndSend)153 TEST(PipeTest, CanReceiveAndSend) {
154   StrictMock<MockFunction<void(absl::Status)>> on_done;
155   EXPECT_CALL(on_done, Call(absl::OkStatus()));
156   MakeActivity(
157       [] {
158         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
159         return Seq(
160             // Concurrently: receive from the pipe, and send 42 into the pipe.
161             Join(Map(pipe->receiver.Next(),
162                      [](NextResult<int> r) { return r.value(); }),
163                  pipe->sender.Push(42)),
164             // Once complete, verify the received value is 42 and successful
165             // sending.
166             [](std::tuple<int, bool> result) {
167               EXPECT_EQ(std::get<0>(result), 42);
168               EXPECT_TRUE(std::get<1>(result));
169               return absl::OkStatus();
170             });
171       },
172       NoWakeupScheduler(),
173       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
174       SimpleArenaAllocator()->MakeArena());
175 }
176 
TEST(PipeTest,CanSeeClosedOnSend)177 TEST(PipeTest, CanSeeClosedOnSend) {
178   StrictMock<MockFunction<void(absl::Status)>> on_done;
179   EXPECT_CALL(on_done, Call(absl::OkStatus()));
180   MakeActivity(
181       [] {
182         Pipe<int> pipe;
183         auto sender = std::move(pipe.sender);
184         auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
185             std::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
186         return Seq(
187             // Concurrently:
188             // - push 43 into the sender, which will stall because there is no
189             //   reader
190             // - and close the receiver, which will fail the pending send.
191             Join(sender.Push(43),
192                  [receiver] {
193                    receiver->reset();
194                    return absl::OkStatus();
195                  }),
196             // Verify both that the send failed and that we executed the close.
197             [](const std::tuple<bool, absl::Status>& result) {
198               EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
199               return absl::OkStatus();
200             });
201       },
202       NoWakeupScheduler(),
203       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
204       SimpleArenaAllocator()->MakeArena());
205 }
206 
TEST(PipeTest,CanSeeClosedOnReceive)207 TEST(PipeTest, CanSeeClosedOnReceive) {
208   StrictMock<MockFunction<void(absl::Status)>> on_done;
209   EXPECT_CALL(on_done, Call(absl::OkStatus()));
210   MakeActivity(
211       [] {
212         Pipe<int> pipe;
213         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
214             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
215         auto receiver = std::move(pipe.receiver);
216         return Seq(
217             // Concurrently:
218             // - wait for a received value (will stall forever since we push
219             //   nothing into the queue)
220             // - close the sender, which will signal the receiver to return an
221             //   end-of-stream.
222             Join(receiver.Next(),
223                  [sender] {
224                    sender->reset();
225                    return absl::OkStatus();
226                  }),
227             // Verify we received end-of-stream and closed the sender.
228             [](std::tuple<NextResult<int>, absl::Status> result) {
229               EXPECT_FALSE(std::get<0>(result).has_value());
230               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
231               return absl::OkStatus();
232             });
233       },
234       NoWakeupScheduler(),
235       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
236       SimpleArenaAllocator()->MakeArena());
237 }
238 
TEST(PipeTest,CanCloseSend)239 TEST(PipeTest, CanCloseSend) {
240   StrictMock<MockFunction<void(absl::Status)>> on_done;
241   EXPECT_CALL(on_done, Call(absl::OkStatus()));
242   MakeActivity(
243       [] {
244         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
245         return Seq(
246             // Concurrently:
247             // - wait for a received value (will stall forever since we push
248             //   nothing into the queue)
249             // - close the sender, which will signal the receiver to return an
250             //   end-of-stream.
251             Join(pipe->receiver.Next(),
252                  [pipe]() mutable {
253                    pipe->sender.Close();
254                    return absl::OkStatus();
255                  }),
256             // Verify we received end-of-stream and closed the sender.
257             [](std::tuple<NextResult<int>, absl::Status> result) {
258               EXPECT_FALSE(std::get<0>(result).has_value());
259               EXPECT_FALSE(std::get<0>(result).cancelled());
260               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
261               return absl::OkStatus();
262             });
263       },
264       NoWakeupScheduler(),
265       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
266       SimpleArenaAllocator()->MakeArena());
267 }
268 
TEST(PipeTest,CanCloseWithErrorSend)269 TEST(PipeTest, CanCloseWithErrorSend) {
270   StrictMock<MockFunction<void(absl::Status)>> on_done;
271   EXPECT_CALL(on_done, Call(absl::OkStatus()));
272   MakeActivity(
273       [] {
274         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
275         return Seq(
276             // Concurrently:
277             // - wait for a received value (will stall forever since we push
278             //   nothing into the queue)
279             // - close the sender, which will signal the receiver to return an
280             //   end-of-stream.
281             Join(pipe->receiver.Next(),
282                  [pipe]() mutable {
283                    pipe->sender.CloseWithError();
284                    return absl::OkStatus();
285                  }),
286             // Verify we received end-of-stream and closed the sender.
287             [](std::tuple<NextResult<int>, absl::Status> result) {
288               EXPECT_FALSE(std::get<0>(result).has_value());
289               EXPECT_TRUE(std::get<0>(result).cancelled());
290               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
291               return absl::OkStatus();
292             });
293       },
294       NoWakeupScheduler(),
295       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
296       SimpleArenaAllocator()->MakeArena());
297 }
298 
TEST(PipeTest,CanCloseWithErrorRecv)299 TEST(PipeTest, CanCloseWithErrorRecv) {
300   StrictMock<MockFunction<void(absl::Status)>> on_done;
301   EXPECT_CALL(on_done, Call(absl::OkStatus()));
302   MakeActivity(
303       [] {
304         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
305         return Seq(
306             // Concurrently:
307             // - wait for a received value (will stall forever since we push
308             //   nothing into the queue)
309             // - close the sender, which will signal the receiver to return an
310             //   end-of-stream.
311             Join(pipe->receiver.Next(),
312                  [pipe]() mutable {
313                    pipe->receiver.CloseWithError();
314                    return absl::OkStatus();
315                  }),
316             // Verify we received end-of-stream and closed the sender.
317             [](std::tuple<NextResult<int>, absl::Status> result) {
318               EXPECT_FALSE(std::get<0>(result).has_value());
319               EXPECT_TRUE(std::get<0>(result).cancelled());
320               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
321               return absl::OkStatus();
322             });
323       },
324       NoWakeupScheduler(),
325       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
326       SimpleArenaAllocator()->MakeArena());
327 }
328 
TEST(PipeTest,CanCloseSendWithInterceptor)329 TEST(PipeTest, CanCloseSendWithInterceptor) {
330   StrictMock<MockFunction<void(absl::Status)>> on_done;
331   EXPECT_CALL(on_done, Call(absl::OkStatus()));
332   MakeActivity(
333       [] {
334         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
335         pipe->sender.InterceptAndMap([](int value) { return value + 1; });
336         return Seq(
337             // Concurrently:
338             // - wait for a received value (will stall forever since we push
339             //   nothing into the queue)
340             // - close the sender, which will signal the receiver to return an
341             //   end-of-stream.
342             Join(pipe->receiver.Next(),
343                  [pipe]() mutable {
344                    pipe->sender.Close();
345                    return absl::OkStatus();
346                  }),
347             // Verify we received end-of-stream and closed the sender.
348             [](std::tuple<NextResult<int>, absl::Status> result) {
349               EXPECT_FALSE(std::get<0>(result).has_value());
350               EXPECT_FALSE(std::get<0>(result).cancelled());
351               EXPECT_EQ(std::get<1>(result), absl::OkStatus());
352               return absl::OkStatus();
353             });
354       },
355       NoWakeupScheduler(),
356       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
357       SimpleArenaAllocator()->MakeArena());
358 }
359 
TEST(PipeTest,CanCancelSendWithInterceptor)360 TEST(PipeTest, CanCancelSendWithInterceptor) {
361   StrictMock<MockFunction<void(absl::Status)>> on_done;
362   EXPECT_CALL(on_done, Call(absl::OkStatus()));
363   MakeActivity(
364       [] {
365         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
366         pipe->sender.InterceptAndMap([](int) { return absl::nullopt; });
367         return Seq(
368             // Concurrently:
369             // - wait for a received value (will stall forever since we push
370             //   nothing into the queue)
371             // - close the sender, which will signal the receiver to return an
372             //   end-of-stream.
373             Join(pipe->receiver.Next(), pipe->sender.Push(3)),
374             // Verify we received end-of-stream with cancellation and sent
375             // successfully.
376             [](std::tuple<NextResult<int>, bool> result) {
377               EXPECT_FALSE(std::get<0>(result).has_value());
378               EXPECT_TRUE(std::get<0>(result).cancelled());
379               EXPECT_FALSE(std::get<1>(result));
380               return absl::OkStatus();
381             });
382       },
383       NoWakeupScheduler(),
384       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
385       SimpleArenaAllocator()->MakeArena());
386 }
387 
TEST(PipeTest,CanFlowControlThroughManyStages)388 TEST(PipeTest, CanFlowControlThroughManyStages) {
389   StrictMock<MockFunction<void(absl::Status)>> on_done;
390   EXPECT_CALL(on_done, Call(absl::OkStatus()));
391   auto done = std::make_shared<bool>(false);
392   // Push a value through multiple pipes.
393   // Ensure that it's possible to do so and get flow control throughout the
394   // entire pipe: ie that the push down does not complete until the last pipe
395   // completes.
396   MakeActivity(
397       [done] {
398         auto* pipe1 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
399         auto* pipe2 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
400         auto* pipe3 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
401         auto* sender1 = &pipe1->sender;
402         auto* receiver1 = &pipe1->receiver;
403         auto* sender2 = &pipe2->sender;
404         auto* receiver2 = &pipe2->receiver;
405         auto* sender3 = &pipe3->sender;
406         auto* receiver3 = &pipe3->receiver;
407         return Seq(Join(Seq(sender1->Push(1),
408                             [done] {
409                               *done = true;
410                               return 1;
411                             }),
412                         Seq(receiver1->Next(),
413                             [sender2](NextResult<int> r) mutable {
414                               return sender2->Push(r.value());
415                             }),
416                         Seq(receiver2->Next(),
417                             [sender3](NextResult<int> r) mutable {
418                               return sender3->Push(r.value());
419                             }),
420                         Seq(receiver3->Next(),
421                             [done](NextResult<int> r) {
422                               EXPECT_EQ(r.value(), 1);
423                               EXPECT_FALSE(*done);
424                               return 2;
425                             })),
426                    [](std::tuple<int, bool, bool, int> result) {
427                      EXPECT_EQ(result, std::make_tuple(1, true, true, 2));
428                      return absl::OkStatus();
429                    });
430       },
431       NoWakeupScheduler(),
432       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
433       SimpleArenaAllocator()->MakeArena());
434   ASSERT_TRUE(*done);
435 }
436 
TEST(PipeTest,AwaitClosedWorks)437 TEST(PipeTest, AwaitClosedWorks) {
438   StrictMock<MockFunction<void(absl::Status)>> on_done;
439   EXPECT_CALL(on_done, Call(absl::OkStatus()));
440   MakeActivity(
441       [] {
442         auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
443         pipe->sender.InterceptAndMap([](int value) { return value + 1; });
444         return Seq(
445             // Concurrently:
446             // - wait for closed on both ends
447             // - close the sender, which will signal the receiver to return an
448             //   end-of-stream.
449             Join(pipe->receiver.AwaitClosed(), pipe->sender.AwaitClosed(),
450                  [pipe]() mutable {
451                    pipe->sender.Close();
452                    return absl::OkStatus();
453                  }),
454             // Verify we received end-of-stream and closed the sender.
455             [](std::tuple<bool, bool, absl::Status> result) {
456               EXPECT_FALSE(std::get<0>(result));
457               EXPECT_FALSE(std::get<1>(result));
458               EXPECT_EQ(std::get<2>(result), absl::OkStatus());
459               return absl::OkStatus();
460             });
461       },
462       NoWakeupScheduler(),
463       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
464       SimpleArenaAllocator()->MakeArena());
465 }
466 
467 class FakeActivity final : public Activity {
468  public:
Orphan()469   void Orphan() override {}
ForceImmediateRepoll(WakeupMask)470   void ForceImmediateRepoll(WakeupMask) override {}
MakeOwningWaker()471   Waker MakeOwningWaker() override { Crash("Not implemented"); }
MakeNonOwningWaker()472   Waker MakeNonOwningWaker() override { Crash("Not implemented"); }
Run(absl::FunctionRef<void ()> f)473   void Run(absl::FunctionRef<void()> f) {
474     ScopedActivity activity(this);
475     f();
476   }
477 };
478 
TEST(PipeTest,PollAckWaitsForReadyClosed)479 TEST(PipeTest, PollAckWaitsForReadyClosed) {
480   FakeActivity().Run([]() {
481     pipe_detail::Center<int> c;
482     int i = 1;
483     EXPECT_EQ(c.Push(&i), Poll<bool>(true));
484     c.MarkClosed();
485     EXPECT_EQ(c.PollAck(), Poll<bool>(Pending{}));
486   });
487 }
488 
489 }  // namespace grpc_core
490 
main(int argc,char ** argv)491 int main(int argc, char** argv) {
492   ::testing::InitGoogleTest(&argc, argv);
493   grpc_init();
494   int r = RUN_ALL_TESTS();
495   grpc_shutdown();
496   return r;
497 }
498