1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/pipe.h"
16
17 #include <grpc/event_engine/memory_allocator.h>
18 #include <grpc/grpc.h>
19
20 #include <memory>
21 #include <tuple>
22 #include <utility>
23
24 #include "absl/functional/function_ref.h"
25 #include "absl/status/status.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/lib/promise/join.h"
30 #include "src/core/lib/promise/map.h"
31 #include "src/core/lib/promise/seq.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "src/core/lib/resource_quota/resource_quota.h"
34 #include "src/core/util/crash.h"
35 #include "src/core/util/ref_counted_ptr.h"
36 #include "test/core/promise/test_wakeup_schedulers.h"
37
38 using testing::MockFunction;
39 using testing::StrictMock;
40
41 namespace grpc_core {
42
TEST(PipeTest,CanSendAndReceive)43 TEST(PipeTest, CanSendAndReceive) {
44 StrictMock<MockFunction<void(absl::Status)>> on_done;
45 EXPECT_CALL(on_done, Call(absl::OkStatus()));
46 MakeActivity(
47 [] {
48 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
49 return Seq(
50 // Concurrently: send 42 into the pipe, and receive from the pipe.
51 Join(pipe->sender.Push(42),
52 Map(pipe->receiver.Next(),
53 [](NextResult<int> r) { return r.value(); })),
54 // Once complete, verify successful sending and the received value
55 // is 42.
56 [](std::tuple<bool, int> result) {
57 EXPECT_TRUE(std::get<0>(result));
58 EXPECT_EQ(42, std::get<1>(result));
59 return absl::OkStatus();
60 });
61 },
62 NoWakeupScheduler(),
63 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
64 SimpleArenaAllocator()->MakeArena());
65 }
66
TEST(PipeTest,CanInterceptAndMapAtSender)67 TEST(PipeTest, CanInterceptAndMapAtSender) {
68 StrictMock<MockFunction<void(absl::Status)>> on_done;
69 EXPECT_CALL(on_done, Call(absl::OkStatus()));
70 MakeActivity(
71 [] {
72 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
73 pipe->sender.InterceptAndMap([](int value) { return value / 2; });
74 return Seq(
75 // Concurrently: send 42 into the pipe, and receive from the pipe.
76 Join(pipe->sender.Push(42),
77 Map(pipe->receiver.Next(),
78 [](NextResult<int> r) { return r.value(); })),
79 // Once complete, verify successful sending and the received value
80 // is 21.
81 [](std::tuple<bool, int> result) {
82 EXPECT_TRUE(std::get<0>(result));
83 EXPECT_EQ(21, std::get<1>(result));
84 return absl::OkStatus();
85 });
86 },
87 NoWakeupScheduler(),
88 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
89 SimpleArenaAllocator()->MakeArena());
90 }
91
TEST(PipeTest,CanInterceptAndMapAtReceiver)92 TEST(PipeTest, CanInterceptAndMapAtReceiver) {
93 StrictMock<MockFunction<void(absl::Status)>> on_done;
94 EXPECT_CALL(on_done, Call(absl::OkStatus()));
95 MakeActivity(
96 [] {
97 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
98 pipe->receiver.InterceptAndMap([](int value) { return value / 2; });
99 return Seq(
100 // Concurrently: send 42 into the pipe, and receive from the pipe.
101 Join(pipe->sender.Push(42),
102 Map(pipe->receiver.Next(),
103 [](NextResult<int> r) { return r.value(); })),
104 // Once complete, verify successful sending and the received value
105 // is 21.
106 [](std::tuple<bool, int> result) {
107 EXPECT_TRUE(std::get<0>(result));
108 EXPECT_EQ(21, std::get<1>(result));
109 return absl::OkStatus();
110 });
111 },
112 NoWakeupScheduler(),
113 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
114 SimpleArenaAllocator()->MakeArena());
115 }
116
TEST(PipeTest,InterceptionOrderingIsCorrect)117 TEST(PipeTest, InterceptionOrderingIsCorrect) {
118 StrictMock<MockFunction<void(absl::Status)>> on_done;
119 EXPECT_CALL(on_done, Call(absl::OkStatus()));
120 MakeActivity(
121 [] {
122 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<std::string>>();
123 auto appender = [](char c) {
124 return [c](std::string value) {
125 value += c;
126 return value;
127 };
128 };
129 // Interception get added outwardly from the center, and run from sender
130 // to receiver, so the following should result in append "abcd".
131 pipe->receiver.InterceptAndMap(appender('c'));
132 pipe->sender.InterceptAndMap(appender('b'));
133 pipe->receiver.InterceptAndMap(appender('d'));
134 pipe->sender.InterceptAndMap(appender('a'));
135 return Seq(
136 // Concurrently: send "" into the pipe, and receive from the pipe.
137 Join(pipe->sender.Push(""),
138 Map(pipe->receiver.Next(),
139 [](NextResult<std::string> r) { return r.value(); })),
140 // Once complete, verify successful sending and the received value
141 // is 21.
142 [](std::tuple<bool, std::string> result) {
143 EXPECT_TRUE(std::get<0>(result));
144 EXPECT_EQ("abcd", std::get<1>(result));
145 return absl::OkStatus();
146 });
147 },
148 NoWakeupScheduler(),
149 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
150 SimpleArenaAllocator()->MakeArena());
151 }
152
TEST(PipeTest,CanReceiveAndSend)153 TEST(PipeTest, CanReceiveAndSend) {
154 StrictMock<MockFunction<void(absl::Status)>> on_done;
155 EXPECT_CALL(on_done, Call(absl::OkStatus()));
156 MakeActivity(
157 [] {
158 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
159 return Seq(
160 // Concurrently: receive from the pipe, and send 42 into the pipe.
161 Join(Map(pipe->receiver.Next(),
162 [](NextResult<int> r) { return r.value(); }),
163 pipe->sender.Push(42)),
164 // Once complete, verify the received value is 42 and successful
165 // sending.
166 [](std::tuple<int, bool> result) {
167 EXPECT_EQ(std::get<0>(result), 42);
168 EXPECT_TRUE(std::get<1>(result));
169 return absl::OkStatus();
170 });
171 },
172 NoWakeupScheduler(),
173 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
174 SimpleArenaAllocator()->MakeArena());
175 }
176
TEST(PipeTest,CanSeeClosedOnSend)177 TEST(PipeTest, CanSeeClosedOnSend) {
178 StrictMock<MockFunction<void(absl::Status)>> on_done;
179 EXPECT_CALL(on_done, Call(absl::OkStatus()));
180 MakeActivity(
181 [] {
182 Pipe<int> pipe;
183 auto sender = std::move(pipe.sender);
184 auto receiver = std::make_shared<std::unique_ptr<PipeReceiver<int>>>(
185 std::make_unique<PipeReceiver<int>>(std::move(pipe.receiver)));
186 return Seq(
187 // Concurrently:
188 // - push 43 into the sender, which will stall because there is no
189 // reader
190 // - and close the receiver, which will fail the pending send.
191 Join(sender.Push(43),
192 [receiver] {
193 receiver->reset();
194 return absl::OkStatus();
195 }),
196 // Verify both that the send failed and that we executed the close.
197 [](const std::tuple<bool, absl::Status>& result) {
198 EXPECT_EQ(result, std::make_tuple(false, absl::OkStatus()));
199 return absl::OkStatus();
200 });
201 },
202 NoWakeupScheduler(),
203 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
204 SimpleArenaAllocator()->MakeArena());
205 }
206
TEST(PipeTest,CanSeeClosedOnReceive)207 TEST(PipeTest, CanSeeClosedOnReceive) {
208 StrictMock<MockFunction<void(absl::Status)>> on_done;
209 EXPECT_CALL(on_done, Call(absl::OkStatus()));
210 MakeActivity(
211 [] {
212 Pipe<int> pipe;
213 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
214 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
215 auto receiver = std::move(pipe.receiver);
216 return Seq(
217 // Concurrently:
218 // - wait for a received value (will stall forever since we push
219 // nothing into the queue)
220 // - close the sender, which will signal the receiver to return an
221 // end-of-stream.
222 Join(receiver.Next(),
223 [sender] {
224 sender->reset();
225 return absl::OkStatus();
226 }),
227 // Verify we received end-of-stream and closed the sender.
228 [](std::tuple<NextResult<int>, absl::Status> result) {
229 EXPECT_FALSE(std::get<0>(result).has_value());
230 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
231 return absl::OkStatus();
232 });
233 },
234 NoWakeupScheduler(),
235 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
236 SimpleArenaAllocator()->MakeArena());
237 }
238
TEST(PipeTest,CanCloseSend)239 TEST(PipeTest, CanCloseSend) {
240 StrictMock<MockFunction<void(absl::Status)>> on_done;
241 EXPECT_CALL(on_done, Call(absl::OkStatus()));
242 MakeActivity(
243 [] {
244 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
245 return Seq(
246 // Concurrently:
247 // - wait for a received value (will stall forever since we push
248 // nothing into the queue)
249 // - close the sender, which will signal the receiver to return an
250 // end-of-stream.
251 Join(pipe->receiver.Next(),
252 [pipe]() mutable {
253 pipe->sender.Close();
254 return absl::OkStatus();
255 }),
256 // Verify we received end-of-stream and closed the sender.
257 [](std::tuple<NextResult<int>, absl::Status> result) {
258 EXPECT_FALSE(std::get<0>(result).has_value());
259 EXPECT_FALSE(std::get<0>(result).cancelled());
260 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
261 return absl::OkStatus();
262 });
263 },
264 NoWakeupScheduler(),
265 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
266 SimpleArenaAllocator()->MakeArena());
267 }
268
TEST(PipeTest,CanCloseWithErrorSend)269 TEST(PipeTest, CanCloseWithErrorSend) {
270 StrictMock<MockFunction<void(absl::Status)>> on_done;
271 EXPECT_CALL(on_done, Call(absl::OkStatus()));
272 MakeActivity(
273 [] {
274 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
275 return Seq(
276 // Concurrently:
277 // - wait for a received value (will stall forever since we push
278 // nothing into the queue)
279 // - close the sender, which will signal the receiver to return an
280 // end-of-stream.
281 Join(pipe->receiver.Next(),
282 [pipe]() mutable {
283 pipe->sender.CloseWithError();
284 return absl::OkStatus();
285 }),
286 // Verify we received end-of-stream and closed the sender.
287 [](std::tuple<NextResult<int>, absl::Status> result) {
288 EXPECT_FALSE(std::get<0>(result).has_value());
289 EXPECT_TRUE(std::get<0>(result).cancelled());
290 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
291 return absl::OkStatus();
292 });
293 },
294 NoWakeupScheduler(),
295 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
296 SimpleArenaAllocator()->MakeArena());
297 }
298
TEST(PipeTest,CanCloseWithErrorRecv)299 TEST(PipeTest, CanCloseWithErrorRecv) {
300 StrictMock<MockFunction<void(absl::Status)>> on_done;
301 EXPECT_CALL(on_done, Call(absl::OkStatus()));
302 MakeActivity(
303 [] {
304 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
305 return Seq(
306 // Concurrently:
307 // - wait for a received value (will stall forever since we push
308 // nothing into the queue)
309 // - close the sender, which will signal the receiver to return an
310 // end-of-stream.
311 Join(pipe->receiver.Next(),
312 [pipe]() mutable {
313 pipe->receiver.CloseWithError();
314 return absl::OkStatus();
315 }),
316 // Verify we received end-of-stream and closed the sender.
317 [](std::tuple<NextResult<int>, absl::Status> result) {
318 EXPECT_FALSE(std::get<0>(result).has_value());
319 EXPECT_TRUE(std::get<0>(result).cancelled());
320 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
321 return absl::OkStatus();
322 });
323 },
324 NoWakeupScheduler(),
325 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
326 SimpleArenaAllocator()->MakeArena());
327 }
328
TEST(PipeTest,CanCloseSendWithInterceptor)329 TEST(PipeTest, CanCloseSendWithInterceptor) {
330 StrictMock<MockFunction<void(absl::Status)>> on_done;
331 EXPECT_CALL(on_done, Call(absl::OkStatus()));
332 MakeActivity(
333 [] {
334 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
335 pipe->sender.InterceptAndMap([](int value) { return value + 1; });
336 return Seq(
337 // Concurrently:
338 // - wait for a received value (will stall forever since we push
339 // nothing into the queue)
340 // - close the sender, which will signal the receiver to return an
341 // end-of-stream.
342 Join(pipe->receiver.Next(),
343 [pipe]() mutable {
344 pipe->sender.Close();
345 return absl::OkStatus();
346 }),
347 // Verify we received end-of-stream and closed the sender.
348 [](std::tuple<NextResult<int>, absl::Status> result) {
349 EXPECT_FALSE(std::get<0>(result).has_value());
350 EXPECT_FALSE(std::get<0>(result).cancelled());
351 EXPECT_EQ(std::get<1>(result), absl::OkStatus());
352 return absl::OkStatus();
353 });
354 },
355 NoWakeupScheduler(),
356 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
357 SimpleArenaAllocator()->MakeArena());
358 }
359
TEST(PipeTest,CanCancelSendWithInterceptor)360 TEST(PipeTest, CanCancelSendWithInterceptor) {
361 StrictMock<MockFunction<void(absl::Status)>> on_done;
362 EXPECT_CALL(on_done, Call(absl::OkStatus()));
363 MakeActivity(
364 [] {
365 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
366 pipe->sender.InterceptAndMap([](int) { return absl::nullopt; });
367 return Seq(
368 // Concurrently:
369 // - wait for a received value (will stall forever since we push
370 // nothing into the queue)
371 // - close the sender, which will signal the receiver to return an
372 // end-of-stream.
373 Join(pipe->receiver.Next(), pipe->sender.Push(3)),
374 // Verify we received end-of-stream with cancellation and sent
375 // successfully.
376 [](std::tuple<NextResult<int>, bool> result) {
377 EXPECT_FALSE(std::get<0>(result).has_value());
378 EXPECT_TRUE(std::get<0>(result).cancelled());
379 EXPECT_FALSE(std::get<1>(result));
380 return absl::OkStatus();
381 });
382 },
383 NoWakeupScheduler(),
384 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
385 SimpleArenaAllocator()->MakeArena());
386 }
387
TEST(PipeTest,CanFlowControlThroughManyStages)388 TEST(PipeTest, CanFlowControlThroughManyStages) {
389 StrictMock<MockFunction<void(absl::Status)>> on_done;
390 EXPECT_CALL(on_done, Call(absl::OkStatus()));
391 auto done = std::make_shared<bool>(false);
392 // Push a value through multiple pipes.
393 // Ensure that it's possible to do so and get flow control throughout the
394 // entire pipe: ie that the push down does not complete until the last pipe
395 // completes.
396 MakeActivity(
397 [done] {
398 auto* pipe1 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
399 auto* pipe2 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
400 auto* pipe3 = GetContext<Arena>()->ManagedNew<Pipe<int>>();
401 auto* sender1 = &pipe1->sender;
402 auto* receiver1 = &pipe1->receiver;
403 auto* sender2 = &pipe2->sender;
404 auto* receiver2 = &pipe2->receiver;
405 auto* sender3 = &pipe3->sender;
406 auto* receiver3 = &pipe3->receiver;
407 return Seq(Join(Seq(sender1->Push(1),
408 [done] {
409 *done = true;
410 return 1;
411 }),
412 Seq(receiver1->Next(),
413 [sender2](NextResult<int> r) mutable {
414 return sender2->Push(r.value());
415 }),
416 Seq(receiver2->Next(),
417 [sender3](NextResult<int> r) mutable {
418 return sender3->Push(r.value());
419 }),
420 Seq(receiver3->Next(),
421 [done](NextResult<int> r) {
422 EXPECT_EQ(r.value(), 1);
423 EXPECT_FALSE(*done);
424 return 2;
425 })),
426 [](std::tuple<int, bool, bool, int> result) {
427 EXPECT_EQ(result, std::make_tuple(1, true, true, 2));
428 return absl::OkStatus();
429 });
430 },
431 NoWakeupScheduler(),
432 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
433 SimpleArenaAllocator()->MakeArena());
434 ASSERT_TRUE(*done);
435 }
436
TEST(PipeTest,AwaitClosedWorks)437 TEST(PipeTest, AwaitClosedWorks) {
438 StrictMock<MockFunction<void(absl::Status)>> on_done;
439 EXPECT_CALL(on_done, Call(absl::OkStatus()));
440 MakeActivity(
441 [] {
442 auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
443 pipe->sender.InterceptAndMap([](int value) { return value + 1; });
444 return Seq(
445 // Concurrently:
446 // - wait for closed on both ends
447 // - close the sender, which will signal the receiver to return an
448 // end-of-stream.
449 Join(pipe->receiver.AwaitClosed(), pipe->sender.AwaitClosed(),
450 [pipe]() mutable {
451 pipe->sender.Close();
452 return absl::OkStatus();
453 }),
454 // Verify we received end-of-stream and closed the sender.
455 [](std::tuple<bool, bool, absl::Status> result) {
456 EXPECT_FALSE(std::get<0>(result));
457 EXPECT_FALSE(std::get<1>(result));
458 EXPECT_EQ(std::get<2>(result), absl::OkStatus());
459 return absl::OkStatus();
460 });
461 },
462 NoWakeupScheduler(),
463 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
464 SimpleArenaAllocator()->MakeArena());
465 }
466
467 class FakeActivity final : public Activity {
468 public:
Orphan()469 void Orphan() override {}
ForceImmediateRepoll(WakeupMask)470 void ForceImmediateRepoll(WakeupMask) override {}
MakeOwningWaker()471 Waker MakeOwningWaker() override { Crash("Not implemented"); }
MakeNonOwningWaker()472 Waker MakeNonOwningWaker() override { Crash("Not implemented"); }
Run(absl::FunctionRef<void ()> f)473 void Run(absl::FunctionRef<void()> f) {
474 ScopedActivity activity(this);
475 f();
476 }
477 };
478
TEST(PipeTest,PollAckWaitsForReadyClosed)479 TEST(PipeTest, PollAckWaitsForReadyClosed) {
480 FakeActivity().Run([]() {
481 pipe_detail::Center<int> c;
482 int i = 1;
483 EXPECT_EQ(c.Push(&i), Poll<bool>(true));
484 c.MarkClosed();
485 EXPECT_EQ(c.PollAck(), Poll<bool>(Pending{}));
486 });
487 }
488
489 } // namespace grpc_core
490
main(int argc,char ** argv)491 int main(int argc, char** argv) {
492 ::testing::InitGoogleTest(&argc, argv);
493 grpc_init();
494 int r = RUN_ALL_TESTS();
495 grpc_shutdown();
496 return r;
497 }
498