1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-pairwise.hpp
6
7 \brief Take values pairwise from this observable.
8
9 \return Observable that emits tuples of two the most recent items emitted by the source observable.
10
11 \sample
12 \snippet pairwise.cpp pairwise sample
13 \snippet output.txt pairwise sample
14
15 If the source observable emits less than two items, no pairs are emitted by the source observable:
16 \snippet pairwise.cpp pairwise short sample
17 \snippet output.txt pairwise short sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_PAIRWISE_HPP)
21 #define RXCPP_OPERATORS_RX_PAIRWISE_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct pairwise_invalid_arguments {};
33
34 template<class... AN>
35 struct pairwise_invalid : public rxo::operator_base<pairwise_invalid_arguments<AN...>> {
36 using type = observable<pairwise_invalid_arguments<AN...>, pairwise_invalid<AN...>>;
37 };
38 template<class... AN>
39 using pairwise_invalid_t = typename pairwise_invalid<AN...>::type;
40
41 template<class T>
42 struct pairwise
43 {
44 typedef rxu::decay_t<T> source_value_type;
45 typedef std::tuple<source_value_type, source_value_type> value_type;
46
47 template<class Subscriber>
48 struct pairwise_observer
49 {
50 typedef pairwise_observer<Subscriber> this_type;
51 typedef std::tuple<source_value_type, source_value_type> value_type;
52 typedef rxu::decay_t<Subscriber> dest_type;
53 typedef observer<T, this_type> observer_type;
54 dest_type dest;
55 mutable rxu::detail::maybe<source_value_type> remembered;
56
pairwise_observerrxcpp::operators::detail::pairwise::pairwise_observer57 pairwise_observer(dest_type d)
58 : dest(std::move(d))
59 {
60 }
on_nextrxcpp::operators::detail::pairwise::pairwise_observer61 void on_next(source_value_type v) const {
62 if (remembered.empty()) {
63 remembered.reset(v);
64 return;
65 }
66
67 dest.on_next(std::make_tuple(remembered.get(), v));
68 remembered.reset(v);
69 }
on_errorrxcpp::operators::detail::pairwise::pairwise_observer70 void on_error(rxu::error_ptr e) const {
71 dest.on_error(e);
72 }
on_completedrxcpp::operators::detail::pairwise::pairwise_observer73 void on_completed() const {
74 dest.on_completed();
75 }
76
makerxcpp::operators::detail::pairwise::pairwise_observer77 static subscriber<T, observer_type> make(dest_type d) {
78 auto cs = d.get_subscription();
79 return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d))));
80 }
81 };
82
83 template<class Subscriber>
operator ()rxcpp::operators::detail::pairwise84 auto operator()(Subscriber dest) const
85 -> decltype(pairwise_observer<Subscriber>::make(std::move(dest))) {
86 return pairwise_observer<Subscriber>::make(std::move(dest));
87 }
88 };
89
90 }
91
92 /*! @copydoc rx-pairwise.hpp
93 */
94 template<class... AN>
pairwise(AN &&...an)95 auto pairwise(AN&&... an)
96 -> operator_factory<pairwise_tag, AN...> {
97 return operator_factory<pairwise_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
98 }
99
100 }
101
102 template<>
103 struct member_overload<pairwise_tag>
104 {
105 template<class Observable,
106 class Enabled = rxu::enable_if_all_true_type_t<
107 is_observable<Observable>>,
108 class SourceValue = rxu::value_type_t<Observable>,
109 class Pairwise = rxo::detail::pairwise<SourceValue>,
110 class Value = rxu::value_type_t<Pairwise>>
memberrxcpp::member_overload111 static auto member(Observable&& o)
112 -> decltype(o.template lift<Value>(Pairwise())) {
113 return o.template lift<Value>(Pairwise());
114 }
115
116 template<class... AN>
memberrxcpp::member_overload117 static operators::detail::pairwise_invalid_t<AN...> member(AN...) {
118 std::terminate();
119 return {};
120 static_assert(sizeof...(AN) == 10000, "pairwise takes no arguments");
121 }
122 };
123
124 }
125
126 #endif
127