• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/for_each.h"
16 
17 #include <grpc/event_engine/memory_allocator.h>
18 
19 #include <memory>
20 
21 #include "gmock/gmock.h"
22 #include "gtest/gtest.h"
23 #include "src/core/lib/promise/activity.h"
24 #include "src/core/lib/promise/inter_activity_pipe.h"
25 #include "src/core/lib/promise/join.h"
26 #include "src/core/lib/promise/map.h"
27 #include "src/core/lib/promise/pipe.h"
28 #include "src/core/lib/promise/seq.h"
29 #include "src/core/lib/promise/try_seq.h"
30 #include "src/core/lib/resource_quota/arena.h"
31 #include "src/core/lib/resource_quota/memory_quota.h"
32 #include "src/core/lib/resource_quota/resource_quota.h"
33 #include "src/core/util/ref_counted_ptr.h"
34 #include "test/core/promise/test_wakeup_schedulers.h"
35 
36 using testing::Mock;
37 using testing::MockFunction;
38 using testing::StrictMock;
39 
40 namespace grpc_core {
41 
TEST(ForEachTest,SendThriceWithPipe)42 TEST(ForEachTest, SendThriceWithPipe) {
43   int num_received = 0;
44   StrictMock<MockFunction<void(absl::Status)>> on_done;
45   EXPECT_CALL(on_done, Call(absl::OkStatus()));
46   MakeActivity(
47       [&num_received] {
48         Pipe<int> pipe;
49         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
50             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
51         return Map(
52             Join(
53                 // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
54                 Seq((*sender)->Push(1), [sender] { return (*sender)->Push(2); },
55                     [sender] { return (*sender)->Push(3); },
56                     [sender] {
57                       sender->reset();
58                       return absl::OkStatus();
59                     }),
60                 // Use a ForEach loop to read them out and verify all values are
61                 // seen.
62                 ForEach(std::move(pipe.receiver),
63                         [&num_received](int i) {
64                           num_received++;
65                           EXPECT_EQ(num_received, i);
66                           return absl::OkStatus();
67                         })),
68             JustElem<1>());
69       },
70       NoWakeupScheduler(),
71       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
72       SimpleArenaAllocator()->MakeArena());
73   Mock::VerifyAndClearExpectations(&on_done);
74   EXPECT_EQ(num_received, 3);
75 }
76 
TEST(ForEachTest,SendThriceWithInterActivityPipe)77 TEST(ForEachTest, SendThriceWithInterActivityPipe) {
78   int num_received = 0;
79   StrictMock<MockFunction<void(absl::Status)>> on_done_sender;
80   StrictMock<MockFunction<void(absl::Status)>> on_done_receiver;
81   EXPECT_CALL(on_done_sender, Call(absl::OkStatus()));
82   EXPECT_CALL(on_done_receiver, Call(absl::OkStatus()));
83   InterActivityPipe<int, 1> pipe;
84   auto send_activity = MakeActivity(
85       Seq(
86           // Push 3 things into a pipe -- 1, 2, then 3 -- then close.
87           pipe.sender.Push(1), [&pipe] { return pipe.sender.Push(2); },
88           [&pipe] { return pipe.sender.Push(3); },
89           [&pipe] {
90             auto x = std::move(pipe.sender);
91             return absl::OkStatus();
92           }),
93       InlineWakeupScheduler{}, [&on_done_sender](absl::Status status) {
94         on_done_sender.Call(std::move(status));
95       });
96   MakeActivity(
97       [&num_received, &pipe] {
98         // Use a ForEach loop to read them out and verify
99         // all values are seen.
100         return ForEach(std::move(pipe.receiver), [&num_received](int i) {
101           num_received++;
102           EXPECT_EQ(num_received, i);
103           return absl::OkStatus();
104         });
105       },
106       NoWakeupScheduler(),
107       [&on_done_receiver](absl::Status status) {
108         on_done_receiver.Call(std::move(status));
109       });
110   Mock::VerifyAndClearExpectations(&on_done_sender);
111   Mock::VerifyAndClearExpectations(&on_done_receiver);
112   EXPECT_EQ(num_received, 3);
113 }
114 
115 // Pollable type that stays movable until it's polled, then causes the test to
116 // fail if it's moved again.
117 // Promises have the property that they can be moved until polled, and this
118 // helps us check that the internals of ForEach respect this rule.
119 class MoveableUntilPolled {
120  public:
121   MoveableUntilPolled() = default;
122   MoveableUntilPolled(const MoveableUntilPolled&) = delete;
123   MoveableUntilPolled& operator=(const MoveableUntilPolled&) = delete;
MoveableUntilPolled(MoveableUntilPolled && other)124   MoveableUntilPolled(MoveableUntilPolled&& other) noexcept : polls_(0) {
125     EXPECT_EQ(other.polls_, 0);
126   }
operator =(MoveableUntilPolled && other)127   MoveableUntilPolled& operator=(MoveableUntilPolled&& other) noexcept {
128     EXPECT_EQ(other.polls_, 0);
129     polls_ = 0;
130     return *this;
131   }
132 
operator ()()133   Poll<absl::Status> operator()() {
134     GetContext<Activity>()->ForceImmediateRepoll();
135     ++polls_;
136     if (polls_ == 10) return absl::OkStatus();
137     return Pending();
138   }
139 
140  private:
141   int polls_ = 0;
142 };
143 
TEST(ForEachTest,NoMoveAfterPoll)144 TEST(ForEachTest, NoMoveAfterPoll) {
145   int num_received = 0;
146   StrictMock<MockFunction<void(absl::Status)>> on_done;
147   EXPECT_CALL(on_done, Call(absl::OkStatus()));
148   MakeActivity(
149       [&num_received] {
150         Pipe<int> pipe;
151         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
152             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
153         return Map(
154             Join(
155                 // Push one things into a pipe, then close.
156                 Seq((*sender)->Push(1),
157                     [sender] {
158                       sender->reset();
159                       return absl::OkStatus();
160                     }),
161                 // Use a ForEach loop to read them out and verify all
162                 // values are seen.
163                 // Inject a MoveableUntilPolled into the loop to ensure that
164                 // ForEach doesn't internally move a promise post-polling.
165                 ForEach(std::move(pipe.receiver),
166                         [&num_received](int i) {
167                           num_received++;
168                           EXPECT_EQ(num_received, i);
169                           return MoveableUntilPolled();
170                         })),
171             JustElem<1>());
172       },
173       NoWakeupScheduler(),
174       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
175       SimpleArenaAllocator()->MakeArena());
176   Mock::VerifyAndClearExpectations(&on_done);
177   EXPECT_EQ(num_received, 1);
178 }
179 
TEST(ForEachTest,NextResultHeldThroughCallback)180 TEST(ForEachTest, NextResultHeldThroughCallback) {
181   int num_received = 0;
182   StrictMock<MockFunction<void(absl::Status)>> on_done;
183   EXPECT_CALL(on_done, Call(absl::OkStatus()));
184   MakeActivity(
185       [&num_received] {
186         Pipe<int> pipe;
187         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
188             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
189         return Map(
190             Join(
191                 // Push one things into a pipe, then close.
192                 Seq((*sender)->Push(1),
193                     [sender] {
194                       sender->reset();
195                       return absl::OkStatus();
196                     }),
197                 // Use a ForEach loop to read them out and verify all
198                 // values are seen.
199                 ForEach(std::move(pipe.receiver),
200                         [&num_received, sender](int i) {
201                           // While we're processing a value NextResult
202                           // should be held disallowing new items to be
203                           // pushed.
204                           // We also should not have reached the
205                           // sender->reset() line above yet either, as
206                           // the Push() should block until this code
207                           // completes.
208                           EXPECT_TRUE((*sender)->Push(2)().pending());
209                           num_received++;
210                           EXPECT_EQ(num_received, i);
211                           return TrySeq(
212                               // has the side effect of stalling for some
213                               // iterations
214                               MoveableUntilPolled(), [sender] {
215                                 // Perform the same test verifying the same
216                                 // properties for NextResult holding: all should
217                                 // still be true.
218                                 EXPECT_TRUE((*sender)->Push(2)().pending());
219                                 return absl::OkStatus();
220                               });
221                         })),
222             JustElem<1>());
223       },
224       NoWakeupScheduler(),
225       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
226       SimpleArenaAllocator()->MakeArena());
227   Mock::VerifyAndClearExpectations(&on_done);
228   EXPECT_EQ(num_received, 1);
229 }
230 
231 }  // namespace grpc_core
232 
main(int argc,char ** argv)233 int main(int argc, char** argv) {
234   ::testing::InitGoogleTest(&argc, argv);
235   return RUN_ALL_TESTS();
236 }
237