• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-tap.hpp
6 
7     \brief inspect calls to on_next, on_error and on_completed.
8 
9     \tparam MakeObserverArgN...  these args are passed to make_observer.
10 
11     \param an  these args are passed to make_observer.
12 
13     \return  Observable that emits the same items as the source observable to both the subscriber and the observer.
14 
15     \note If an on_error method is not supplied the observer will ignore errors rather than call std::terminate()
16 
17     \sample
18     \snippet tap.cpp tap sample
19     \snippet output.txt tap sample
20 
21     If the source observable generates an error, the observer passed to tap is called:
22     \snippet tap.cpp error tap sample
23     \snippet output.txt error tap sample
24 */
25 
26 #if !defined(RXCPP_OPERATORS_RX_TAP_HPP)
27 #define RXCPP_OPERATORS_RX_TAP_HPP
28 
29 #include "../rx-includes.hpp"
30 
31 namespace rxcpp {
32 
33 namespace operators {
34 
35 namespace detail {
36 
37 template<class... AN>
38 struct tap_invalid_arguments {};
39 
40 template<class... AN>
41 struct tap_invalid : public rxo::operator_base<tap_invalid_arguments<AN...>> {
42     using type = observable<tap_invalid_arguments<AN...>, tap_invalid<AN...>>;
43 };
44 template<class... AN>
45 using tap_invalid_t = typename tap_invalid<AN...>::type;
46 
47 template<class T, class MakeObserverArgN>
48 struct tap_observer_factory;
49 
50 template<class T, class... ArgN>
51 struct tap_observer_factory<T, std::tuple<ArgN...>>
52 {
53     using source_value_type = rxu::decay_t<T>;
54     using out_type = decltype(make_observer<source_value_type, rxcpp::detail::OnErrorIgnore>(*((ArgN*)nullptr)...));
operator ()rxcpp::operators::detail::tap_observer_factory55     auto operator()(ArgN&&... an) -> out_type const {
56         return make_observer<source_value_type, rxcpp::detail::OnErrorIgnore>(std::forward<ArgN>(an)...);
57     }
58 };
59 
60 template<class T, class MakeObserverArgN, class Factory = tap_observer_factory<T, MakeObserverArgN>>
61 struct tap
62 {
63     using source_value_type = rxu::decay_t<T>;
64     using args_type = rxu::decay_t<MakeObserverArgN>;
65     using factory_type = Factory;
66     using out_type = typename factory_type::out_type;
67     out_type out;
68 
taprxcpp::operators::detail::tap69     tap(args_type a)
70         : out(rxu::apply(std::move(a), factory_type()))
71     {
72     }
73 
74     template<class Subscriber>
75     struct tap_observer
76     {
77         using this_type = tap_observer<Subscriber>;
78         using value_type = source_value_type;
79         using dest_type = rxu::decay_t<Subscriber>;
80         using factory_type = Factory;
81         using out_type = typename factory_type::out_type;
82         using observer_type = observer<value_type, this_type>;
83         dest_type dest;
84         out_type out;
85 
tap_observerrxcpp::operators::detail::tap::tap_observer86         tap_observer(dest_type d, out_type o)
87             : dest(std::move(d))
88             , out(std::move(o))
89         {
90         }
on_nextrxcpp::operators::detail::tap::tap_observer91         void on_next(source_value_type v) const {
92             out.on_next(v);
93             dest.on_next(v);
94         }
on_errorrxcpp::operators::detail::tap::tap_observer95         void on_error(rxu::error_ptr e) const {
96             out.on_error(e);
97             dest.on_error(e);
98         }
on_completedrxcpp::operators::detail::tap::tap_observer99         void on_completed() const {
100             out.on_completed();
101             dest.on_completed();
102         }
103 
makerxcpp::operators::detail::tap::tap_observer104         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, out_type o) {
105             return make_subscriber<value_type>(d, this_type(d, std::move(o)));
106         }
107     };
108 
109     template<class Subscriber>
operator ()rxcpp::operators::detail::tap110     auto operator()(Subscriber dest) const
111         -> decltype(tap_observer<Subscriber>::make(std::move(dest), out)) {
112         return      tap_observer<Subscriber>::make(std::move(dest), out);
113     }
114 };
115 
116 }
117 
118 /*! @copydoc rx-tap.hpp
119 */
120 template<class... AN>
tap(AN &&...an)121 auto tap(AN&&... an)
122     ->      operator_factory<tap_tag, AN...> {
123      return operator_factory<tap_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
124 }
125 
126 }
127 
128 template<>
129 struct member_overload<tap_tag>
130 {
131     template<class Observable, class... MakeObserverArgN,
132         class Enabled = rxu::enable_if_all_true_type_t<
133             is_observable<Observable>>,
134         class SourceValue = rxu::value_type_t<Observable>,
135         class Tap = rxo::detail::tap<SourceValue, std::tuple<rxu::decay_t<MakeObserverArgN>...>>>
memberrxcpp::member_overload136     static auto member(Observable&& o, MakeObserverArgN&&... an)
137         -> decltype(o.template lift<SourceValue>(Tap(std::make_tuple(std::forward<MakeObserverArgN>(an)...)))) {
138         return      o.template lift<SourceValue>(Tap(std::make_tuple(std::forward<MakeObserverArgN>(an)...)));
139     }
140 
141     template<class... AN>
memberrxcpp::member_overload142     static operators::detail::tap_invalid_t<AN...> member(const AN&...) {
143         std::terminate();
144         return {};
145         static_assert(sizeof...(AN) == 10000, "tap takes (MakeObserverArgN...)");
146     }
147 };
148 
149 }
150 
151 #endif
152