1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-all.hpp
6
7 \brief Returns an Observable that emits true if every item emitted by the source Observable satisfies a specified condition, otherwise false.
8 Emits true if the source Observable terminates without emitting any item.
9
10 \tparam Predicate the type of the test function.
11
12 \param p the test function to test items emitted by the source Observable.
13
14 \return Observable that emits true if every item emitted by the source observable satisfies a specified condition, otherwise false.
15
16 \sample
17 \snippet all.cpp all sample
18 \snippet output.txt all sample
19 */
20
21 #if !defined(RXCPP_OPERATORS_RX_ALL_HPP)
22 #define RXCPP_OPERATORS_RX_ALL_HPP
23
24 #include "../rx-includes.hpp"
25
26 namespace rxcpp {
27
28 namespace operators {
29
30 namespace detail {
31
32 template<class... AN>
33 struct all_invalid_arguments {};
34
35 template<class... AN>
36 struct all_invalid : public rxo::operator_base<all_invalid_arguments<AN...>> {
37 using type = observable<all_invalid_arguments<AN...>, all_invalid<AN...>>;
38 };
39 template<class... AN>
40 using all_invalid_t = typename all_invalid<AN...>::type;
41
42 template<class T, class Predicate>
43 struct all
44 {
45 typedef rxu::decay_t<T> source_value_type;
46 typedef rxu::decay_t<Predicate> test_type;
47 test_type test;
48
49 typedef bool value_type;
50
allrxcpp::operators::detail::all51 all(test_type t)
52 : test(std::move(t))
53 {
54 }
55
56 template<class Subscriber>
57 struct all_observer
58 {
59 typedef all_observer<Subscriber> this_type;
60 typedef source_value_type value_type;
61 typedef rxu::decay_t<Subscriber> dest_type;
62 typedef observer<value_type, this_type> observer_type;
63 dest_type dest;
64 test_type test;
65 mutable bool done;
66
all_observerrxcpp::operators::detail::all::all_observer67 all_observer(dest_type d, test_type t)
68 : dest(std::move(d))
69 , test(std::move(t)),
70 done(false)
71 {
72 }
on_nextrxcpp::operators::detail::all::all_observer73 void on_next(source_value_type v) const {
74 auto filtered = on_exception([&]() {
75 return !this->test(v); },
76 dest);
77 if (filtered.empty()) {
78 return;
79 }
80 if (filtered.get() && !done) {
81 done = true;
82 dest.on_next(false);
83 dest.on_completed();
84 }
85 }
on_errorrxcpp::operators::detail::all::all_observer86 void on_error(rxu::error_ptr e) const {
87 dest.on_error(e);
88 }
on_completedrxcpp::operators::detail::all::all_observer89 void on_completed() const {
90 if(!done) {
91 done = true;
92 dest.on_next(true);
93 dest.on_completed();
94 }
95 }
96
makerxcpp::operators::detail::all::all_observer97 static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
98 return make_subscriber<value_type>(d, this_type(d, std::move(t)));
99 }
100 };
101
102 template<class Subscriber>
operator ()rxcpp::operators::detail::all103 auto operator()(Subscriber dest) const
104 -> decltype(all_observer<Subscriber>::make(std::move(dest), test)) {
105 return all_observer<Subscriber>::make(std::move(dest), test);
106 }
107 };
108
109 }
110
111 /*! @copydoc rx-all.hpp
112 */
113 template<class... AN>
all(AN &&...an)114 auto all(AN&&... an)
115 -> operator_factory<all_tag, AN...> {
116 return operator_factory<all_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
117 }
118
119 /*! \brief Returns an Observable that emits true if the source Observable is empty, otherwise false.
120
121 \return An observable that emits a boolean value.
122
123 \sample
124 \snippet is_empty.cpp is_empty sample
125 \snippet output.txt is_empty sample
126 */
127 template<class... AN>
is_empty(AN &&...an)128 auto is_empty(AN&&... an)
129 -> operator_factory<is_empty_tag, AN...> {
130 return operator_factory<is_empty_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
131 }
132
133 }
134
135 template<>
136 struct member_overload<all_tag>
137 {
138 template<class Observable, class Predicate,
139 class SourceValue = rxu::value_type_t<Observable>,
140 class Enabled = rxu::enable_if_all_true_type_t<
141 is_observable<Observable>>,
142 class All = rxo::detail::all<SourceValue, rxu::decay_t<Predicate>>,
143 class Value = rxu::value_type_t<All>>
memberrxcpp::member_overload144 static auto member(Observable&& o, Predicate&& p)
145 -> decltype(o.template lift<Value>(All(std::forward<Predicate>(p)))) {
146 return o.template lift<Value>(All(std::forward<Predicate>(p)));
147 }
148
149 template<class... AN>
memberrxcpp::member_overload150 static operators::detail::all_invalid_t<AN...> member(const AN&...) {
151 std::terminate();
152 return {};
153 static_assert(sizeof...(AN) == 10000, "all takes (Predicate)");
154 }
155 };
156
157 template<>
158 struct member_overload<is_empty_tag>
159 {
160 template<class Observable,
161 class SourceValue = rxu::value_type_t<Observable>,
162 class Enabled = rxu::enable_if_all_true_type_t<
163 is_observable<Observable>>,
164 class Predicate = std::function<bool(SourceValue)>,
165 class IsEmpty = rxo::detail::all<SourceValue, rxu::decay_t<Predicate>>,
166 class Value = rxu::value_type_t<IsEmpty>>
memberrxcpp::member_overload167 static auto member(Observable&& o)
168 -> decltype(o.template lift<Value>(IsEmpty(nullptr))) {
169 return o.template lift<Value>(IsEmpty([](SourceValue) { return false; }));
170 }
171
172 template<class... AN>
memberrxcpp::member_overload173 static operators::detail::all_invalid_t<AN...> member(AN...) {
174 std::terminate();
175 return {};
176 static_assert(sizeof...(AN) == 10000, "is_empty takes no arguments");
177 }
178 };
179
180 }
181
182 #endif
183