• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include "absl/log/log.h"
21 #include "absl/status/status.h"
22 #include "src/core/lib/promise/detail/promise_factory.h"
23 #include "src/core/lib/promise/for_each.h"
24 #include "src/core/lib/promise/map.h"
25 #include "src/core/lib/promise/pipe.h"
26 #include "src/core/lib/promise/poll.h"
27 #include "src/core/lib/promise/try_seq.h"
28 
29 namespace grpc_core {
30 
31 // Apply a (possibly async) mapping function to src, and output into dst.
32 //
33 // In pseudo-code:
34 // for each element in wait_for src.Next:
35 //   x = wait_for filter_factory(element)
36 //   wait_for dst.Push(x)
37 template <typename T, typename Filter>
MapPipe(PipeReceiver<T> src,PipeSender<T> dst,Filter filter_factory)38 auto MapPipe(PipeReceiver<T> src, PipeSender<T> dst, Filter filter_factory) {
39   return ForEach(
40       std::move(src),
41       [filter_factory = promise_detail::RepeatedPromiseFactory<T, Filter>(
42            std::move(filter_factory)),
43        dst = std::move(dst)](T t) mutable {
44         return TrySeq(
45             [] {
46               GRPC_TRACE_VLOG(promise_primitives, 2) << "MapPipe: start map";
47               return Empty{};
48             },
49             filter_factory.Make(std::move(t)),
50             [&dst](T t) {
51               GRPC_TRACE_VLOG(promise_primitives, 2) << "MapPipe: start push";
52               return Map(dst.Push(std::move(t)), [](bool successful_push) {
53                 if (successful_push) {
54                   return absl::OkStatus();
55                 }
56                 return absl::CancelledError();
57               });
58             });
59       });
60 }
61 
62 // Helper to intecept a pipe and apply a mapping function.
63 // Each of the `Intercept` constructors will take a PipeSender or PipeReceiver,
64 // construct a new pipe, and then replace the passed in pipe with its new end.
65 // In this way it can interject logic per-element.
66 // Next, the TakeAndRun function will return a promise that can be run to apply
67 // a mapping promise to each element of the pipe.
68 template <typename T>
69 class PipeMapper {
70  public:
Intercept(PipeSender<T> & intercept_sender)71   static PipeMapper Intercept(PipeSender<T>& intercept_sender) {
72     PipeMapper<T> r;
73     r.interceptor_.sender.Swap(&intercept_sender);
74     return r;
75   }
76 
Intercept(PipeReceiver<T> & intercept_receiver)77   static PipeMapper Intercept(PipeReceiver<T>& intercept_receiver) {
78     PipeMapper<T> r;
79     r.interceptor_.receiver.Swap(&intercept_receiver);
80     return r;
81   }
82 
83   template <typename Filter>
TakeAndRun(Filter filter)84   auto TakeAndRun(Filter filter) {
85     return MapPipe(std::move(interceptor_.receiver),
86                    std::move(interceptor_.sender), std::move(filter));
87   }
88 
89  private:
90   PipeMapper() = default;
91   Pipe<T> interceptor_;
92 };
93 
94 }  // namespace grpc_core
95 
96 #endif  // GRPC_SRC_CORE_LIB_PROMISE_MAP_PIPE_H
97