1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/map_pipe.h"
16
17 #include <grpc/event_engine/memory_allocator.h>
18
19 #include <memory>
20 #include <utility>
21
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "src/core/lib/promise/activity.h"
25 #include "src/core/lib/promise/for_each.h"
26 #include "src/core/lib/promise/join.h"
27 #include "src/core/lib/promise/map.h"
28 #include "src/core/lib/promise/pipe.h"
29 #include "src/core/lib/promise/poll.h"
30 #include "src/core/lib/promise/seq.h"
31 #include "src/core/lib/resource_quota/arena.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "src/core/lib/resource_quota/resource_quota.h"
34 #include "src/core/util/ref_counted_ptr.h"
35 #include "test/core/promise/test_wakeup_schedulers.h"
36
37 using testing::Mock;
38 using testing::MockFunction;
39 using testing::StrictMock;
40
41 namespace grpc_core {
42
43 template <typename T>
44 class Delayed {
45 public:
Delayed(T x)46 explicit Delayed(T x) : x_(x) {}
47
operator ()()48 Poll<T> operator()() {
49 GetContext<Activity>()->ForceImmediateRepoll();
50 ++polls_;
51 if (polls_ == 10) return std::move(x_);
52 return Pending();
53 }
54
55 private:
56 int polls_ = 0;
57 T x_;
58 };
59
TEST(MapPipeTest,SendThriceWithPipeInterceptingReceive)60 TEST(MapPipeTest, SendThriceWithPipeInterceptingReceive) {
61 int num_received = 0;
62 StrictMock<MockFunction<void(absl::Status)>> on_done;
63 EXPECT_CALL(on_done, Call(absl::OkStatus()));
64 MakeActivity(
65 [&num_received] {
66 Pipe<int> pipe;
67 auto filter =
68 PipeMapper<int>::Intercept(pipe.receiver).TakeAndRun([](int x) {
69 return Delayed<int>(x + 1);
70 });
71 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
72 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
73 return Map(
74 Join(
75 std::move(filter),
76 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
77 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
78 [sender] { return (*sender)->Push(2); },
79 [sender] {
80 sender->reset();
81 return absl::OkStatus();
82 }),
83 // Use a ForEach loop to read them out and verify all values are
84 // seen (but with 1 added).
85 ForEach(std::move(pipe.receiver),
86 [&num_received](int i) {
87 num_received++;
88 EXPECT_EQ(num_received, i);
89 return absl::OkStatus();
90 })),
91 JustElem<2>());
92 },
93 NoWakeupScheduler(),
94 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
95 SimpleArenaAllocator()->MakeArena());
96 Mock::VerifyAndClearExpectations(&on_done);
97 EXPECT_EQ(num_received, 3);
98 }
99
TEST(MapPipeTest,SendThriceWithPipeInterceptingSend)100 TEST(MapPipeTest, SendThriceWithPipeInterceptingSend) {
101 int num_received = 0;
102 StrictMock<MockFunction<void(absl::Status)>> on_done;
103 EXPECT_CALL(on_done, Call(absl::OkStatus()));
104 MakeActivity(
105 [&num_received] {
106 Pipe<int> pipe;
107 auto filter =
108 PipeMapper<int>::Intercept(pipe.sender).TakeAndRun([](int x) {
109 return Delayed<int>(x + 1);
110 });
111 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
112 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
113 return Map(
114 Join(
115 std::move(filter),
116 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
117 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
118 [sender] { return (*sender)->Push(2); },
119 [sender] {
120 sender->reset();
121 return absl::OkStatus();
122 }),
123 // Use a ForEach loop to read them out and verify all values are
124 // seen (but with 1 added).
125 ForEach(std::move(pipe.receiver),
126 [&num_received](int i) {
127 num_received++;
128 EXPECT_EQ(num_received, i);
129 return absl::OkStatus();
130 })),
131 JustElem<2>());
132 },
133 NoWakeupScheduler(),
134 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
135 SimpleArenaAllocator()->MakeArena());
136 Mock::VerifyAndClearExpectations(&on_done);
137 EXPECT_EQ(num_received, 3);
138 }
139
140 } // namespace grpc_core
141
main(int argc,char ** argv)142 int main(int argc, char** argv) {
143 ::testing::InitGoogleTest(&argc, argv);
144 return RUN_ALL_TESTS();
145 }
146