1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-distinct_until_changed.hpp
6
7 \brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
8
9 \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
10
11 \param pred (optional) the function that implements comparison of two values.
12
13 \return Observable that emits those items from the source observable that are distinct from their immediate predecessors.
14
15 \sample
16 \snippet distinct_until_changed.cpp distinct_until_changed sample
17 \snippet output.txt distinct_until_changed sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP)
21 #define RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct distinct_until_changed_invalid_arguments {};
33
34 template<class... AN>
35 struct distinct_until_changed_invalid : public rxo::operator_base<distinct_until_changed_invalid_arguments<AN...>> {
36 using type = observable<distinct_until_changed_invalid_arguments<AN...>, distinct_until_changed_invalid<AN...>>;
37 };
38 template<class... AN>
39 using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid<AN...>::type;
40
41 template<class T, class BinaryPredicate>
42 struct distinct_until_changed
43 {
44 typedef rxu::decay_t<T> source_value_type;
45 typedef rxu::decay_t<BinaryPredicate> predicate_type;
46
47 predicate_type pred;
48
distinct_until_changedrxcpp::operators::detail::distinct_until_changed49 distinct_until_changed(predicate_type p)
50 : pred(std::move(p))
51 {
52 }
53
54 template<class Subscriber>
55 struct distinct_until_changed_observer
56 {
57 typedef distinct_until_changed_observer<Subscriber> this_type;
58 typedef source_value_type value_type;
59 typedef rxu::decay_t<Subscriber> dest_type;
60 typedef observer<value_type, this_type> observer_type;
61
62 dest_type dest;
63 predicate_type pred;
64 mutable rxu::detail::maybe<source_value_type> remembered;
65
distinct_until_changed_observerrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer66 distinct_until_changed_observer(dest_type d, predicate_type pred)
67 : dest(std::move(d))
68 , pred(std::move(pred))
69 {
70 }
on_nextrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer71 void on_next(source_value_type v) const {
72 if (remembered.empty() || !pred(v, remembered.get())) {
73 remembered.reset(v);
74 dest.on_next(v);
75 }
76 }
on_errorrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer77 void on_error(rxu::error_ptr e) const {
78 dest.on_error(e);
79 }
on_completedrxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer80 void on_completed() const {
81 dest.on_completed();
82 }
83
makerxcpp::operators::detail::distinct_until_changed::distinct_until_changed_observer84 static subscriber<value_type, observer_type> make(dest_type d, predicate_type p) {
85 return make_subscriber<value_type>(d, this_type(d, std::move(p)));
86 }
87 };
88
89 template<class Subscriber>
operator ()rxcpp::operators::detail::distinct_until_changed90 auto operator()(Subscriber dest) const
91 -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred)) {
92 return distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred);
93 }
94 };
95
96 }
97
98 /*! @copydoc rx-distinct_until_changed.hpp
99 */
100 template<class... AN>
distinct_until_changed(AN &&...an)101 auto distinct_until_changed(AN&&... an)
102 -> operator_factory<distinct_until_changed_tag, AN...> {
103 return operator_factory<distinct_until_changed_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
104 }
105
106 }
107
108 template<>
109 struct member_overload<distinct_until_changed_tag>
110 {
111 template<class Observable,
112 class SourceValue = rxu::value_type_t<Observable>,
113 class Enabled = rxu::enable_if_all_true_type_t<
114 is_observable<Observable>>,
115 class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, rxu::equal_to<>>>
memberrxcpp::member_overload116 static auto member(Observable&& o)
117 -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()))) {
118 return o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()));
119 }
120
121 template<class Observable,
122 class BinaryPredicate,
123 class SourceValue = rxu::value_type_t<Observable>,
124 class Enabled = rxu::enable_if_all_true_type_t<
125 is_observable<Observable>>,
126 class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, BinaryPredicate>>
memberrxcpp::member_overload127 static auto member(Observable&& o, BinaryPredicate&& pred)
128 -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)))) {
129 return o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)));
130 }
131
132 template<class... AN>
memberrxcpp::member_overload133 static operators::detail::distinct_until_changed_invalid_t<AN...> member(AN...) {
134 std::terminate();
135 return {};
136 static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)");
137 }
138 };
139
140 }
141
142 #endif
143