1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/inter_activity_pipe.h"
16
17 #include <memory>
18
19 #include "absl/status/status.h"
20 #include "gtest/gtest.h"
21 #include "src/core/lib/promise/seq.h"
22 #include "test/core/promise/test_wakeup_schedulers.h"
23
24 namespace grpc_core {
25
26 template <typename F>
TestActivity(F f)27 ActivityPtr TestActivity(F f) {
28 return MakeActivity(std::move(f), InlineWakeupScheduler{},
29 [](absl::Status status) { EXPECT_TRUE(status.ok()); });
30 }
31
TEST(InterActivityPipe,CanSendAndReceive)32 TEST(InterActivityPipe, CanSendAndReceive) {
33 InterActivityPipe<int, 1> pipe;
34 bool done = false;
35 auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) {
36 EXPECT_TRUE(b);
37 return absl::OkStatus();
38 }));
39 EXPECT_FALSE(done);
40 auto b = TestActivity(Seq(pipe.receiver.Next(),
41 [&done](InterActivityPipe<int, 1>::NextResult n) {
42 EXPECT_EQ(n.value(), 3);
43 done = true;
44 return absl::OkStatus();
45 }));
46 EXPECT_TRUE(done);
47 }
48
TEST(InterActivityPipe,CanSendTwiceAndReceive)49 TEST(InterActivityPipe, CanSendTwiceAndReceive) {
50 InterActivityPipe<int, 1> pipe;
51 bool done = false;
52 auto a = TestActivity(Seq(
53 pipe.sender.Push(3),
54 [&](bool b) {
55 EXPECT_TRUE(b);
56 return pipe.sender.Push(4);
57 },
58 [](bool b) {
59 EXPECT_TRUE(b);
60 return absl::OkStatus();
61 }));
62 EXPECT_FALSE(done);
63 auto b = TestActivity(Seq(
64 pipe.receiver.Next(),
65 [&pipe](InterActivityPipe<int, 1>::NextResult n) {
66 EXPECT_EQ(n.value(), 3);
67 return pipe.receiver.Next();
68 },
69 [&done](InterActivityPipe<int, 1>::NextResult n) {
70 EXPECT_EQ(n.value(), 4);
71 done = true;
72 return absl::OkStatus();
73 }));
74 EXPECT_TRUE(done);
75 }
76
TEST(InterActivityPipe,CanReceiveAndSend)77 TEST(InterActivityPipe, CanReceiveAndSend) {
78 InterActivityPipe<int, 1> pipe;
79 bool done = false;
80 auto b = TestActivity(Seq(pipe.receiver.Next(),
81 [&done](InterActivityPipe<int, 1>::NextResult n) {
82 EXPECT_EQ(n.value(), 3);
83 done = true;
84 return absl::OkStatus();
85 }));
86 EXPECT_FALSE(done);
87 auto a = TestActivity(Seq(pipe.sender.Push(3), [](bool b) {
88 EXPECT_TRUE(b);
89 return absl::OkStatus();
90 }));
91 EXPECT_TRUE(done);
92 }
93
TEST(InterActivityPipe,CanClose)94 TEST(InterActivityPipe, CanClose) {
95 InterActivityPipe<int, 1> pipe;
96 bool done = false;
97 auto b = TestActivity(Seq(pipe.receiver.Next(),
98 [&done](InterActivityPipe<int, 1>::NextResult n) {
99 EXPECT_FALSE(n.has_value());
100 done = true;
101 return absl::OkStatus();
102 }));
103 EXPECT_FALSE(done);
104 // Drop the sender
105 {
106 auto x = std::move(pipe.sender);
107 }
108 EXPECT_TRUE(done);
109 }
110
111 } // namespace grpc_core
112
main(int argc,char ** argv)113 int main(int argc, char** argv) {
114 ::testing::InitGoogleTest(&argc, argv);
115 return RUN_ALL_TESTS();
116 }
117