1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-take_while.hpp
6
7 \brief For the first items fulfilling the predicate from this observable emit them from the new observable that is returned.
8
9 \tparam Predicate the type of the predicate
10
11 \param t the predicate
12
13 \return An observable that emits only the first items emitted by the source Observable fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
14
15 \sample
16 \snippet take_while.cpp take_while sample
17 \snippet output.txt take_while sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_TAKE_WHILE_HPP)
21 #define RXCPP_OPERATORS_RX_TAKE_WHILE_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct take_while_invalid_arguments {};
33
34 template<class... AN>
35 struct take_while_invalid : public rxo::operator_base<take_while_invalid_arguments<AN...>> {
36 using type = observable<take_while_invalid_arguments<AN...>, take_while_invalid<AN...>>;
37 };
38 template<class... AN>
39 using take_while_invalid_t = typename take_while_invalid<AN...>::type;
40
41 template<class T, class Predicate>
42 struct take_while
43 {
44 typedef rxu::decay_t<T> source_value_type;
45 typedef rxu::decay_t<Predicate> test_type;
46 test_type test;
47
48
take_whilerxcpp::operators::detail::take_while49 take_while(test_type t)
50 : test(std::move(t))
51 {
52 }
53
54 template<class Subscriber>
55 struct take_while_observer
56 {
57 typedef take_while_observer<Subscriber> this_type;
58 typedef source_value_type value_type;
59 typedef rxu::decay_t<Subscriber> dest_type;
60 typedef observer<value_type, this_type> observer_type;
61 dest_type dest;
62 test_type test;
63
take_while_observerrxcpp::operators::detail::take_while::take_while_observer64 take_while_observer(dest_type d, test_type t)
65 : dest(std::move(d))
66 , test(std::move(t))
67 {
68 }
on_nextrxcpp::operators::detail::take_while::take_while_observer69 void on_next(source_value_type v) const {
70 if (test(v)) {
71 dest.on_next(v);
72 } else {
73 dest.on_completed();
74 }
75 }
on_errorrxcpp::operators::detail::take_while::take_while_observer76 void on_error(rxu::error_ptr e) const {
77 dest.on_error(e);
78 }
on_completedrxcpp::operators::detail::take_while::take_while_observer79 void on_completed() const {
80 dest.on_completed();
81 }
82
makerxcpp::operators::detail::take_while::take_while_observer83 static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
84 return make_subscriber<value_type>(d, this_type(d, std::move(t)));
85 }
86 };
87
88 template<class Subscriber>
operator ()rxcpp::operators::detail::take_while89 auto operator()(Subscriber dest) const
90 -> decltype(take_while_observer<Subscriber>::make(std::move(dest), test)) {
91 return take_while_observer<Subscriber>::make(std::move(dest), test);
92 }
93 };
94
95 }
96
97 /*! @copydoc rx-take_while.hpp
98 */
99 template<class... AN>
take_while(AN &&...an)100 auto take_while(AN&&... an)
101 -> operator_factory<take_while_tag, AN...> {
102 return operator_factory<take_while_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
103 }
104
105 }
106
107 template<>
108 struct member_overload<take_while_tag>
109 {
110 template<class Observable, class Predicate,
111 class SourceValue = rxu::value_type_t<Observable>,
112 class TakeWhile = rxo::detail::take_while<SourceValue, rxu::decay_t<Predicate>>>
memberrxcpp::member_overload113 static auto member(Observable&& o, Predicate&& p)
114 -> decltype(o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)))) {
115 return o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)));
116 }
117
118 template<class... AN>
memberrxcpp::member_overload119 static operators::detail::take_while_invalid_t<AN...> member(const AN&...) {
120 std::terminate();
121 return {};
122 static_assert(sizeof...(AN) == 10000, "take_while takes (Predicate)");
123 }
124 };
125
126
127 }
128
129 #endif
130