• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-skip.hpp
6 
7     \brief Make new observable with skipped first count items from this observable.
8 
9     \tparam Count the type of the items counter
10 
11     \param t the number of items to skip
12 
13     \return An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.
14 
15     \sample
16     \snippet skip.cpp skip sample
17     \snippet output.txt skip sample
18 */
19 
20 #if !defined(RXCPP_OPERATORS_RX_SKIP_HPP)
21 #define RXCPP_OPERATORS_RX_SKIP_HPP
22 
23 #include "../rx-includes.hpp"
24 
25 namespace rxcpp {
26 
27 namespace operators {
28 
29 namespace detail {
30 
31 template<class... AN>
32 struct skip_invalid_arguments {};
33 
34 template<class... AN>
35 struct skip_invalid : public rxo::operator_base<skip_invalid_arguments<AN...>> {
36     using type = observable<skip_invalid_arguments<AN...>, skip_invalid<AN...>>;
37 };
38 
39 template<class... AN>
40 using skip_invalid_t = typename skip_invalid<AN...>::type;
41 
42 template<class T, class Observable, class Count>
43 struct skip : public operator_base<T>
44 {
45     typedef rxu::decay_t<Observable> source_type;
46     typedef rxu::decay_t<Count> count_type;
47     struct values
48     {
valuesrxcpp::operators::detail::skip::values49         values(source_type s, count_type t)
50             : source(std::move(s))
51             , count(std::move(t))
52         {
53         }
54         source_type source;
55         count_type count;
56     };
57     values initial;
58 
skiprxcpp::operators::detail::skip59     skip(source_type s, count_type t)
60         : initial(std::move(s), std::move(t))
61     {
62     }
63 
64     struct mode
65     {
66         enum type {
67             skipping,  // ignore messages
68             triggered, // capture messages
69             errored,   // error occured
70             stopped    // observable completed
71         };
72     };
73 
74     template<class Subscriber>
on_subscriberxcpp::operators::detail::skip75     void on_subscribe(const Subscriber& s) const {
76 
77         typedef Subscriber output_type;
78         struct state_type
79             : public std::enable_shared_from_this<state_type>
80             , public values
81         {
82             state_type(const values& i, const output_type& oarg)
83                 : values(i)
84                 , mode_value(i.count > 0 ? mode::skipping : mode::triggered)
85                 , out(oarg)
86             {
87             }
88             typename mode::type mode_value;
89             output_type out;
90         };
91         // take a copy of the values for each subscription
92         auto state = std::make_shared<state_type>(initial, s);
93 
94         composite_subscription source_lifetime;
95 
96         s.add(source_lifetime);
97 
98         state->source.subscribe(
99         // split subscription lifetime
100             source_lifetime,
101         // on_next
102             [state](T t) {
103                 if (state->mode_value == mode::skipping) {
104                     if (--state->count == 0) {
105                         state->mode_value = mode::triggered;
106                     }
107                 } else {
108                     state->out.on_next(t);
109                 }
110             },
111         // on_error
112             [state](rxu::error_ptr e) {
113                 state->mode_value = mode::errored;
114                 state->out.on_error(e);
115             },
116         // on_completed
117             [state]() {
118                 state->mode_value = mode::stopped;
119                 state->out.on_completed();
120             }
121         );
122     }
123 };
124 
125 }
126 
127 /*! @copydoc rx-skip.hpp
128 */
129 template<class... AN>
skip(AN &&...an)130 auto skip(AN&&... an)
131 ->     operator_factory<skip_tag, AN...> {
132     return operator_factory<skip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
133 }
134 
135 }
136 
137 template<>
138 struct member_overload<skip_tag>
139 {
140     template<class Observable,
141             class Count,
142             class Enabled = rxu::enable_if_all_true_type_t<
143             is_observable<Observable>>,
144             class SourceValue = rxu::value_type_t<Observable>,
145             class Skip = rxo::detail::skip<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
146             class Value = rxu::value_type_t<Skip>,
147             class Result = observable<Value, Skip>>
memberrxcpp::member_overload148     static Result member(Observable&& o, Count&& c) {
149         return Result(Skip(std::forward<Observable>(o), std::forward<Count>(c)));
150     }
151 
152     template<class... AN>
memberrxcpp::member_overload153     static operators::detail::skip_invalid_t<AN...> member(AN...) {
154         std::terminate();
155         return {};
156         static_assert(sizeof...(AN) == 10000, "skip takes (optional Count)");
157     }
158 };
159 
160 }
161 
162 #endif
163