1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-skip.hpp
6
7 \brief Make new observable with skipped first count items from this observable.
8
9 \tparam Count the type of the items counter
10
11 \param t the number of items to skip
12
13 \return An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.
14
15 \sample
16 \snippet skip.cpp skip sample
17 \snippet output.txt skip sample
18 */
19
20 #if !defined(RXCPP_OPERATORS_RX_SKIP_HPP)
21 #define RXCPP_OPERATORS_RX_SKIP_HPP
22
23 #include "../rx-includes.hpp"
24
25 namespace rxcpp {
26
27 namespace operators {
28
29 namespace detail {
30
31 template<class... AN>
32 struct skip_invalid_arguments {};
33
34 template<class... AN>
35 struct skip_invalid : public rxo::operator_base<skip_invalid_arguments<AN...>> {
36 using type = observable<skip_invalid_arguments<AN...>, skip_invalid<AN...>>;
37 };
38
39 template<class... AN>
40 using skip_invalid_t = typename skip_invalid<AN...>::type;
41
42 template<class T, class Observable, class Count>
43 struct skip : public operator_base<T>
44 {
45 typedef rxu::decay_t<Observable> source_type;
46 typedef rxu::decay_t<Count> count_type;
47 struct values
48 {
valuesrxcpp::operators::detail::skip::values49 values(source_type s, count_type t)
50 : source(std::move(s))
51 , count(std::move(t))
52 {
53 }
54 source_type source;
55 count_type count;
56 };
57 values initial;
58
skiprxcpp::operators::detail::skip59 skip(source_type s, count_type t)
60 : initial(std::move(s), std::move(t))
61 {
62 }
63
64 struct mode
65 {
66 enum type {
67 skipping, // ignore messages
68 triggered, // capture messages
69 errored, // error occured
70 stopped // observable completed
71 };
72 };
73
74 template<class Subscriber>
on_subscriberxcpp::operators::detail::skip75 void on_subscribe(const Subscriber& s) const {
76
77 typedef Subscriber output_type;
78 struct state_type
79 : public std::enable_shared_from_this<state_type>
80 , public values
81 {
82 state_type(const values& i, const output_type& oarg)
83 : values(i)
84 , mode_value(i.count > 0 ? mode::skipping : mode::triggered)
85 , out(oarg)
86 {
87 }
88 typename mode::type mode_value;
89 output_type out;
90 };
91 // take a copy of the values for each subscription
92 auto state = std::make_shared<state_type>(initial, s);
93
94 composite_subscription source_lifetime;
95
96 s.add(source_lifetime);
97
98 state->source.subscribe(
99 // split subscription lifetime
100 source_lifetime,
101 // on_next
102 [state](T t) {
103 if (state->mode_value == mode::skipping) {
104 if (--state->count == 0) {
105 state->mode_value = mode::triggered;
106 }
107 } else {
108 state->out.on_next(t);
109 }
110 },
111 // on_error
112 [state](rxu::error_ptr e) {
113 state->mode_value = mode::errored;
114 state->out.on_error(e);
115 },
116 // on_completed
117 [state]() {
118 state->mode_value = mode::stopped;
119 state->out.on_completed();
120 }
121 );
122 }
123 };
124
125 }
126
127 /*! @copydoc rx-skip.hpp
128 */
129 template<class... AN>
skip(AN &&...an)130 auto skip(AN&&... an)
131 -> operator_factory<skip_tag, AN...> {
132 return operator_factory<skip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
133 }
134
135 }
136
137 template<>
138 struct member_overload<skip_tag>
139 {
140 template<class Observable,
141 class Count,
142 class Enabled = rxu::enable_if_all_true_type_t<
143 is_observable<Observable>>,
144 class SourceValue = rxu::value_type_t<Observable>,
145 class Skip = rxo::detail::skip<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
146 class Value = rxu::value_type_t<Skip>,
147 class Result = observable<Value, Skip>>
memberrxcpp::member_overload148 static Result member(Observable&& o, Count&& c) {
149 return Result(Skip(std::forward<Observable>(o), std::forward<Count>(c)));
150 }
151
152 template<class... AN>
memberrxcpp::member_overload153 static operators::detail::skip_invalid_t<AN...> member(AN...) {
154 std::terminate();
155 return {};
156 static_assert(sizeof...(AN) == 10000, "skip takes (optional Count)");
157 }
158 };
159
160 }
161
162 #endif
163