1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-distinct.hpp
6
7 \brief For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
8
9 \return Observable that emits those items from the source observable that are distinct.
10
11 \note istinct keeps an unordered_set<T> of past values. Due to an issue in multiple implementations of std::hash<T>, rxcpp maintains a whitelist of hashable types. new types can be added by specializing rxcpp::filtered_hash<T>
12
13 \sample
14 \snippet distinct.cpp distinct sample
15 \snippet output.txt distinct sample
16 */
17
18 #if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP)
19 #define RXCPP_OPERATORS_RX_DISTINCT_HPP
20
21 #include "../rx-includes.hpp"
22
23 namespace rxcpp {
24
25 namespace operators {
26
27 namespace detail {
28
29 template<class... AN>
30 struct distinct_invalid_arguments {};
31
32 template<class... AN>
33 struct distinct_invalid : public rxo::operator_base<distinct_invalid_arguments<AN...>> {
34 using type = observable<distinct_invalid_arguments<AN...>, distinct_invalid<AN...>>;
35 };
36 template<class... AN>
37 using distinct_invalid_t = typename distinct_invalid<AN...>::type;
38
39 template<class T>
40 struct distinct
41 {
42 typedef rxu::decay_t<T> source_value_type;
43
44 template<class Subscriber>
45 struct distinct_observer
46 {
47 typedef distinct_observer<Subscriber> this_type;
48 typedef source_value_type value_type;
49 typedef rxu::decay_t<Subscriber> dest_type;
50 typedef observer<value_type, this_type> observer_type;
51 dest_type dest;
52 mutable std::unordered_set<source_value_type, rxcpp::filtered_hash<source_value_type>> remembered;
53
distinct_observerrxcpp::operators::detail::distinct::distinct_observer54 distinct_observer(dest_type d)
55 : dest(d)
56 {
57 }
on_nextrxcpp::operators::detail::distinct::distinct_observer58 void on_next(source_value_type v) const {
59 if (remembered.empty() || remembered.count(v) == 0) {
60 remembered.insert(v);
61 dest.on_next(v);
62 }
63 }
on_errorrxcpp::operators::detail::distinct::distinct_observer64 void on_error(rxu::error_ptr e) const {
65 dest.on_error(e);
66 }
on_completedrxcpp::operators::detail::distinct::distinct_observer67 void on_completed() const {
68 dest.on_completed();
69 }
70
makerxcpp::operators::detail::distinct::distinct_observer71 static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
72 return make_subscriber<value_type>(d, this_type(d));
73 }
74 };
75
76 template<class Subscriber>
operator ()rxcpp::operators::detail::distinct77 auto operator()(Subscriber dest) const
78 -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) {
79 return distinct_observer<Subscriber>::make(std::move(dest));
80 }
81 };
82
83 }
84
85 /*! @copydoc rx-distinct.hpp
86 */
87 template<class... AN>
distinct(AN &&...an)88 auto distinct(AN&&... an)
89 -> operator_factory<distinct_tag, AN...> {
90 return operator_factory<distinct_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
91 }
92
93 }
94
95 template<>
96 struct member_overload<distinct_tag>
97 {
98 template<class Observable,
99 class SourceValue = rxu::value_type_t<Observable>,
100 class Enabled = rxu::enable_if_all_true_type_t<
101 is_observable<Observable>,
102 is_hashable<SourceValue>>,
103 class Distinct = rxo::detail::distinct<SourceValue>>
memberrxcpp::member_overload104 static auto member(Observable&& o)
105 -> decltype(o.template lift<SourceValue>(Distinct())) {
106 return o.template lift<SourceValue>(Distinct());
107 }
108
109 template<class... AN>
memberrxcpp::member_overload110 static operators::detail::distinct_invalid_t<AN...> member(AN...) {
111 std::terminate();
112 return {};
113 static_assert(sizeof...(AN) == 10000, "distinct takes no arguments");
114 }
115 };
116
117 }
118
119 #endif
120