1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/for_each.h"
16
17 #include <grpc/event_engine/memory_allocator.h>
18
19 #include <memory>
20
21 #include "gmock/gmock.h"
22 #include "gtest/gtest.h"
23 #include "src/core/lib/promise/activity.h"
24 #include "src/core/lib/promise/inter_activity_pipe.h"
25 #include "src/core/lib/promise/join.h"
26 #include "src/core/lib/promise/map.h"
27 #include "src/core/lib/promise/pipe.h"
28 #include "src/core/lib/promise/seq.h"
29 #include "src/core/lib/promise/try_seq.h"
30 #include "src/core/lib/resource_quota/arena.h"
31 #include "src/core/lib/resource_quota/memory_quota.h"
32 #include "src/core/lib/resource_quota/resource_quota.h"
33 #include "src/core/util/ref_counted_ptr.h"
34 #include "test/core/promise/test_wakeup_schedulers.h"
35
36 using testing::Mock;
37 using testing::MockFunction;
38 using testing::StrictMock;
39
40 namespace grpc_core {
41
TEST(ForEachTest,SendThriceWithPipe)42 TEST(ForEachTest, SendThriceWithPipe) {
43 int num_received = 0;
44 StrictMock<MockFunction<void(absl::Status)>> on_done;
45 EXPECT_CALL(on_done, Call(absl::OkStatus()));
46 MakeActivity(
47 [&num_received] {
48 Pipe<int> pipe;
49 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
50 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
51 return Map(
52 Join(
53 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
54 Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
55 [sender] { return (*sender)->Push(3); },
56 [sender] {
57 sender->reset();
58 return absl::OkStatus();
59 }),
60 // Use a ForEach loop to read them out and verify all values are
61 // seen.
62 ForEach(std::move(pipe.receiver),
63 [&num_received](int i) {
64 num_received++;
65 EXPECT_EQ(num_received, i);
66 return absl::OkStatus();
67 })),
68 JustElem<1>());
69 },
70 NoWakeupScheduler(),
71 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
72 SimpleArenaAllocator()->MakeArena());
73 Mock::VerifyAndClearExpectations(&on_done);
74 EXPECT_EQ(num_received, 3);
75 }
76
TEST(ForEachTest,SendThriceWithInterActivityPipe)77 TEST(ForEachTest, SendThriceWithInterActivityPipe) {
78 int num_received = 0;
79 StrictMock<MockFunction<void(absl::Status)>> on_done_sender;
80 StrictMock<MockFunction<void(absl::Status)>> on_done_receiver;
81 EXPECT_CALL(on_done_sender, Call(absl::OkStatus()));
82 EXPECT_CALL(on_done_receiver, Call(absl::OkStatus()));
83 InterActivityPipe<int, 1> pipe;
84 auto send_activity = MakeActivity(
85 Seq(
86 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
87 pipe.sender.Push(1), [&pipe] { return pipe.sender.Push(2); },
88 [&pipe] { return pipe.sender.Push(3); },
89 [&pipe] {
90 auto x = std::move(pipe.sender);
91 return absl::OkStatus();
92 }),
93 InlineWakeupScheduler{}, [&on_done_sender](absl::Status status) {
94 on_done_sender.Call(std::move(status));
95 });
96 MakeActivity(
97 [&num_received, &pipe] {
98 // Use a ForEach loop to read them out and verify
99 // all values are seen.
100 return ForEach(std::move(pipe.receiver), [&num_received](int i) {
101 num_received++;
102 EXPECT_EQ(num_received, i);
103 return absl::OkStatus();
104 });
105 },
106 NoWakeupScheduler(),
107 [&on_done_receiver](absl::Status status) {
108 on_done_receiver.Call(std::move(status));
109 });
110 Mock::VerifyAndClearExpectations(&on_done_sender);
111 Mock::VerifyAndClearExpectations(&on_done_receiver);
112 EXPECT_EQ(num_received, 3);
113 }
114
115 // Pollable type that stays movable until it's polled, then causes the test to
116 // fail if it's moved again.
117 // Promises have the property that they can be moved until polled, and this
118 // helps us check that the internals of ForEach respect this rule.
119 class MoveableUntilPolled {
120 public:
121 MoveableUntilPolled() = default;
122 MoveableUntilPolled(const MoveableUntilPolled&) = delete;
123 MoveableUntilPolled& operator=(const MoveableUntilPolled&) = delete;
MoveableUntilPolled(MoveableUntilPolled && other)124 MoveableUntilPolled(MoveableUntilPolled&& other) noexcept : polls_(0) {
125 EXPECT_EQ(other.polls_, 0);
126 }
operator =(MoveableUntilPolled && other)127 MoveableUntilPolled& operator=(MoveableUntilPolled&& other) noexcept {
128 EXPECT_EQ(other.polls_, 0);
129 polls_ = 0;
130 return *this;
131 }
132
operator ()()133 Poll<absl::Status> operator()() {
134 GetContext<Activity>()->ForceImmediateRepoll();
135 ++polls_;
136 if (polls_ == 10) return absl::OkStatus();
137 return Pending();
138 }
139
140 private:
141 int polls_ = 0;
142 };
143
TEST(ForEachTest,NoMoveAfterPoll)144 TEST(ForEachTest, NoMoveAfterPoll) {
145 int num_received = 0;
146 StrictMock<MockFunction<void(absl::Status)>> on_done;
147 EXPECT_CALL(on_done, Call(absl::OkStatus()));
148 MakeActivity(
149 [&num_received] {
150 Pipe<int> pipe;
151 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
152 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
153 return Map(
154 Join(
155 // Push one things into a pipe, then close.
156 Seq((*sender)->Push(1),
157 [sender] {
158 sender->reset();
159 return absl::OkStatus();
160 }),
161 // Use a ForEach loop to read them out and verify all
162 // values are seen.
163 // Inject a MoveableUntilPolled into the loop to ensure that
164 // ForEach doesn't internally move a promise post-polling.
165 ForEach(std::move(pipe.receiver),
166 [&num_received](int i) {
167 num_received++;
168 EXPECT_EQ(num_received, i);
169 return MoveableUntilPolled();
170 })),
171 JustElem<1>());
172 },
173 NoWakeupScheduler(),
174 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
175 SimpleArenaAllocator()->MakeArena());
176 Mock::VerifyAndClearExpectations(&on_done);
177 EXPECT_EQ(num_received, 1);
178 }
179
TEST(ForEachTest,NextResultHeldThroughCallback)180 TEST(ForEachTest, NextResultHeldThroughCallback) {
181 int num_received = 0;
182 StrictMock<MockFunction<void(absl::Status)>> on_done;
183 EXPECT_CALL(on_done, Call(absl::OkStatus()));
184 MakeActivity(
185 [&num_received] {
186 Pipe<int> pipe;
187 auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
188 std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
189 return Map(
190 Join(
191 // Push one things into a pipe, then close.
192 Seq((*sender)->Push(1),
193 [sender] {
194 sender->reset();
195 return absl::OkStatus();
196 }),
197 // Use a ForEach loop to read them out and verify all
198 // values are seen.
199 ForEach(std::move(pipe.receiver),
200 [&num_received, sender](int i) {
201 // While we're processing a value NextResult
202 // should be held disallowing new items to be
203 // pushed.
204 // We also should not have reached the
205 // sender->reset() line above yet either, as
206 // the Push() should block until this code
207 // completes.
208 EXPECT_TRUE((*sender)->Push(2)().pending());
209 num_received++;
210 EXPECT_EQ(num_received, i);
211 return TrySeq(
212 // has the side effect of stalling for some
213 // iterations
214 MoveableUntilPolled(), [sender] {
215 // Perform the same test verifying the same
216 // properties for NextResult holding: all should
217 // still be true.
218 EXPECT_TRUE((*sender)->Push(2)().pending());
219 return absl::OkStatus();
220 });
221 })),
222 JustElem<1>());
223 },
224 NoWakeupScheduler(),
225 [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
226 SimpleArenaAllocator()->MakeArena());
227 Mock::VerifyAndClearExpectations(&on_done);
228 EXPECT_EQ(num_received, 1);
229 }
230
231 } // namespace grpc_core
232
main(int argc,char ** argv)233 int main(int argc, char** argv) {
234 ::testing::InitGoogleTest(&argc, argv);
235 return RUN_ALL_TESTS();
236 }
237