1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-filter.hpp
6
7 \brief For each item from this observable use Predicate to select which items to emit from the new observable that is returned.
8
9 \tparam Predicate the type of the filter function
10
11 \param p the filter function
12
13 \return Observable that emits only those items emitted by the source observable that the filter evaluates as true.
14
15 \sample
16 \snippet filter.cpp filter sample
17 \snippet output.txt filter sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_FILTER_HPP)
21 #define RXCPP_OPERATORS_RX_FILTER_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct filter_invalid_arguments {};
33
34 template<class... AN>
35 struct filter_invalid : public rxo::operator_base<filter_invalid_arguments<AN...>> {
36 using type = observable<filter_invalid_arguments<AN...>, filter_invalid<AN...>>;
37 };
38 template<class... AN>
39 using filter_invalid_t = typename filter_invalid<AN...>::type;
40
41 template<class T, class Predicate>
42 struct filter
43 {
44 typedef rxu::decay_t<T> source_value_type;
45 typedef rxu::decay_t<Predicate> test_type;
46 test_type test;
47
filterrxcpp::operators::detail::filter48 filter(test_type t)
49 : test(std::move(t))
50 {
51 }
52
53 template<class Subscriber>
54 struct filter_observer
55 {
56 typedef filter_observer<Subscriber> this_type;
57 typedef source_value_type value_type;
58 typedef rxu::decay_t<Subscriber> dest_type;
59 typedef observer<value_type, this_type> observer_type;
60 dest_type dest;
61 mutable test_type test;
62
filter_observerrxcpp::operators::detail::filter::filter_observer63 filter_observer(dest_type d, test_type t)
64 : dest(std::move(d))
65 , test(std::move(t))
66 {
67 }
68
69 template <class Value>
on_nextrxcpp::operators::detail::filter::filter_observer70 void on_next(Value&& v) const {
71 auto filtered = on_exception([&](){
72 return !this->test(rxu::as_const(v));
73 },
74 dest);
75 if (filtered.empty()) {
76 return;
77 }
78 if (!filtered.get()) {
79 dest.on_next(std::forward<Value>(v));
80 }
81 }
on_errorrxcpp::operators::detail::filter::filter_observer82 void on_error(rxu::error_ptr e) const {
83 dest.on_error(e);
84 }
on_completedrxcpp::operators::detail::filter::filter_observer85 void on_completed() const {
86 dest.on_completed();
87 }
88
makerxcpp::operators::detail::filter::filter_observer89 static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
90 return make_subscriber<value_type>(d, this_type(d, std::move(t)));
91 }
92 };
93
94 template<class Subscriber>
operator ()rxcpp::operators::detail::filter95 auto operator()(Subscriber dest) const
96 -> decltype(filter_observer<Subscriber>::make(std::move(dest), test)) {
97 return filter_observer<Subscriber>::make(std::move(dest), test);
98 }
99 };
100
101 }
102
103 /*! @copydoc rx-filter.hpp
104 */
105 template<class... AN>
filter(AN &&...an)106 auto filter(AN&&... an)
107 -> operator_factory<filter_tag, AN...> {
108 return operator_factory<filter_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
109 }
110
111 }
112
113 template<>
114 struct member_overload<filter_tag>
115 {
116 template<class Observable, class Predicate,
117 class SourceValue = rxu::value_type_t<Observable>,
118 class Filter = rxo::detail::filter<SourceValue, rxu::decay_t<Predicate>>>
memberrxcpp::member_overload119 static auto member(Observable&& o, Predicate&& p)
120 -> decltype(o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)))) {
121 return o.template lift<SourceValue>(Filter(std::forward<Predicate>(p)));
122 }
123
124 template<class... AN>
memberrxcpp::member_overload125 static operators::detail::filter_invalid_t<AN...> member(const AN&...) {
126 std::terminate();
127 return {};
128 static_assert(sizeof...(AN) == 10000, "filter takes (Predicate)");
129 }
130 };
131
132 }
133
134 #endif
135