• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/map_pipe.h"
16 
17 #include <grpc/event_engine/memory_allocator.h>
18 
19 #include <memory>
20 #include <utility>
21 
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "src/core/lib/promise/activity.h"
25 #include "src/core/lib/promise/for_each.h"
26 #include "src/core/lib/promise/join.h"
27 #include "src/core/lib/promise/map.h"
28 #include "src/core/lib/promise/pipe.h"
29 #include "src/core/lib/promise/poll.h"
30 #include "src/core/lib/promise/seq.h"
31 #include "src/core/lib/resource_quota/arena.h"
32 #include "src/core/lib/resource_quota/memory_quota.h"
33 #include "src/core/lib/resource_quota/resource_quota.h"
34 #include "src/core/util/ref_counted_ptr.h"
35 #include "test/core/promise/test_wakeup_schedulers.h"
36 
37 using testing::Mock;
38 using testing::MockFunction;
39 using testing::StrictMock;
40 
41 namespace grpc_core {
42 
43 template <typename T>
44 class Delayed {
45  public:
Delayed(T x)46   explicit Delayed(T x) : x_(x) {}
47 
operator ()()48   Poll<T> operator()() {
49     GetContext<Activity>()->ForceImmediateRepoll();
50     ++polls_;
51     if (polls_ == 10) return std::move(x_);
52     return Pending();
53   }
54 
55  private:
56   int polls_ = 0;
57   T x_;
58 };
59 
TEST(MapPipeTest,SendThriceWithPipeInterceptingReceive)60 TEST(MapPipeTest, SendThriceWithPipeInterceptingReceive) {
61   int num_received = 0;
62   StrictMock<MockFunction<void(absl::Status)>> on_done;
63   EXPECT_CALL(on_done, Call(absl::OkStatus()));
64   MakeActivity(
65       [&num_received] {
66         Pipe<int> pipe;
67         auto filter =
68             PipeMapper<int>::Intercept(pipe.receiver).TakeAndRun([](int x) {
69               return Delayed<int>(x + 1);
70             });
71         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
72             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
73         return Map(
74             Join(
75                 std::move(filter),
76                 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
77                 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
78                     [sender] { return (*sender)->Push(2); },
79                     [sender] {
80                       sender->reset();
81                       return absl::OkStatus();
82                     }),
83                 // Use a ForEach loop to read them out and verify all values are
84                 // seen (but with 1 added).
85                 ForEach(std::move(pipe.receiver),
86                         [&num_received](int i) {
87                           num_received++;
88                           EXPECT_EQ(num_received, i);
89                           return absl::OkStatus();
90                         })),
91             JustElem<2>());
92       },
93       NoWakeupScheduler(),
94       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
95       SimpleArenaAllocator()->MakeArena());
96   Mock::VerifyAndClearExpectations(&on_done);
97   EXPECT_EQ(num_received, 3);
98 }
99 
TEST(MapPipeTest,SendThriceWithPipeInterceptingSend)100 TEST(MapPipeTest, SendThriceWithPipeInterceptingSend) {
101   int num_received = 0;
102   StrictMock<MockFunction<void(absl::Status)>> on_done;
103   EXPECT_CALL(on_done, Call(absl::OkStatus()));
104   MakeActivity(
105       [&num_received] {
106         Pipe<int> pipe;
107         auto filter =
108             PipeMapper<int>::Intercept(pipe.sender).TakeAndRun([](int x) {
109               return Delayed<int>(x + 1);
110             });
111         auto sender = std::make_shared<std::unique_ptr<PipeSender<int>>>(
112             std::make_unique<PipeSender<int>>(std::move(pipe.sender)));
113         return Map(
114             Join(
115                 std::move(filter),
116                 // Push 3 things into a pipe -- 0, 1, then 2 -- then close.
117                 Seq((*sender)->Push(0), [sender] { return (*sender)->Push(1); },
118                     [sender] { return (*sender)->Push(2); },
119                     [sender] {
120                       sender->reset();
121                       return absl::OkStatus();
122                     }),
123                 // Use a ForEach loop to read them out and verify all values are
124                 // seen (but with 1 added).
125                 ForEach(std::move(pipe.receiver),
126                         [&num_received](int i) {
127                           num_received++;
128                           EXPECT_EQ(num_received, i);
129                           return absl::OkStatus();
130                         })),
131             JustElem<2>());
132       },
133       NoWakeupScheduler(),
134       [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
135       SimpleArenaAllocator()->MakeArena());
136   Mock::VerifyAndClearExpectations(&on_done);
137   EXPECT_EQ(num_received, 3);
138 }
139 
140 }  // namespace grpc_core
141 
main(int argc,char ** argv)142 int main(int argc, char** argv) {
143   ::testing::InitGoogleTest(&argc, argv);
144   return RUN_ALL_TESTS();
145 }
146