• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_ITERATE_HPP)
6 #define RXCPP_SOURCES_RX_ITERATE_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 /*! \file rx-iterate.hpp
11 
12     \brief Returns an observable that sends each value in the collection, on the specified scheduler.
13 
14     \tparam Collection    the type of the collection of values that this observable emits
15     \tparam Coordination  the type of the scheduler (optional)
16 
17     \param  c   collection containing values to send
18     \param  cn  the scheduler to use for scheduling the items (optional)
19 
20     \return  Observable that sends each value in the collection.
21 
22     \sample
23     \snippet iterate.cpp iterate sample
24     \snippet output.txt iterate sample
25 
26     \sample
27     \snippet iterate.cpp threaded iterate sample
28     \snippet output.txt threaded iterate sample
29 
30 */
31 
32 namespace rxcpp {
33 
34 namespace sources {
35 
36 namespace detail {
37 
38 template<class Collection>
39 struct is_iterable
40 {
41     typedef rxu::decay_t<Collection> collection_type;
42 
43     struct not_void {};
44     template<class CC>
45     static auto check(int) -> decltype(std::begin(*(CC*)nullptr));
46     template<class CC>
47     static not_void check(...);
48 
49     static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
50 };
51 
52 template<class Collection>
53 struct iterate_traits
54 {
55     typedef rxu::decay_t<Collection> collection_type;
56     typedef rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))> iterator_type;
57     typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
58 };
59 
60 template<class Collection, class Coordination>
61 struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>>>
62 {
63     typedef iterate<Collection, Coordination> this_type;
64     typedef iterate_traits<Collection> traits;
65 
66     typedef rxu::decay_t<Coordination> coordination_type;
67     typedef typename coordination_type::coordinator_type coordinator_type;
68 
69     typedef typename traits::collection_type collection_type;
70     typedef typename traits::iterator_type iterator_type;
71 
72     struct iterate_initial_type
73     {
iterate_initial_typerxcpp::sources::detail::iterate::iterate_initial_type74         iterate_initial_type(collection_type c, coordination_type cn)
75             : collection(std::move(c))
76             , coordination(std::move(cn))
77         {
78         }
79         collection_type collection;
80         coordination_type coordination;
81     };
82     iterate_initial_type initial;
83 
iteraterxcpp::sources::detail::iterate84     iterate(collection_type c, coordination_type cn)
85         : initial(std::move(c), std::move(cn))
86     {
87     }
88     template<class Subscriber>
on_subscriberxcpp::sources::detail::iterate89     void on_subscribe(Subscriber o) const {
90         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
91 
92         typedef typename coordinator_type::template get<Subscriber>::type output_type;
93 
94         struct iterate_state_type
95             : public iterate_initial_type
96         {
97             iterate_state_type(const iterate_initial_type& i, output_type o)
98                 : iterate_initial_type(i)
99                 , cursor(std::begin(iterate_initial_type::collection))
100                 , end(std::end(iterate_initial_type::collection))
101                 , out(std::move(o))
102             {
103             }
104             iterate_state_type(const iterate_state_type& o)
105                 : iterate_initial_type(o)
106                 , cursor(std::begin(iterate_initial_type::collection))
107                 , end(std::end(iterate_initial_type::collection))
108                 , out(std::move(o.out)) // since lambda capture does not yet support move
109             {
110             }
111             mutable iterator_type cursor;
112             iterator_type end;
113             mutable output_type out;
114         };
115 
116         // creates a worker whose lifetime is the same as this subscription
117         auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
118 
119         iterate_state_type state(initial, o);
120 
121         auto controller = coordinator.get_worker();
122 
123         auto producer = [state](const rxsc::schedulable& self){
124             if (!state.out.is_subscribed()) {
125                 // terminate loop
126                 return;
127             }
128 
129             if (state.cursor != state.end) {
130                 // send next value
131                 state.out.on_next(*state.cursor);
132                 ++state.cursor;
133             }
134 
135             if (state.cursor == state.end) {
136                 state.out.on_completed();
137                 // o is unsubscribed
138                 return;
139             }
140 
141             // tail recurse this same action to continue loop
142             self();
143         };
144         auto selectedProducer = on_exception(
145             [&](){return coordinator.act(producer);},
146             o);
147         if (selectedProducer.empty()) {
148             return;
149         }
150         controller.schedule(selectedProducer.get());
151 
152     }
153 };
154 
155 }
156 
157 /*! @copydoc rx-iterate.hpp
158  */
159 template<class Collection>
iterate(Collection c)160 auto iterate(Collection c)
161     ->      observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
162     return  observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
163                                                                               detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
164 }
165 /*! @copydoc rx-iterate.hpp
166  */
167 template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)168 auto iterate(Collection c, Coordination cn)
169     ->      observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
170     return  observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>>(
171                                                                               detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
172 }
173 
174 /*! Returns an observable that sends an empty set of values and then completes.
175 
176     \tparam T  the type of elements (not) to be sent
177 
178     \return  Observable that sends an empty set of values and then completes.
179 
180     This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator.
181 
182     \note This is a degenerate case of ```from(Value0 v0, ValueN... vn)``` operator.
183 */
184 template<class T>
from()185 auto from()
186     -> decltype(iterate(std::initializer_list<T>(), identity_immediate())) {
187     return      iterate(std::initializer_list<T>(), identity_immediate());
188 }
189 /*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler.
190 
191     \tparam T  the type of elements (not) to be sent
192     \tparam Coordination  the type of the scheduler
193 
194     \return  Observable that sends an empty set of values and then completes.
195 
196     \note This is a degenerate case of ```from(Coordination cn, Value0 v0, ValueN... vn)``` operator.
197 */
198 template<class T, class Coordination>
from(Coordination cn)199 auto from(Coordination cn)
200     -> typename std::enable_if<is_coordination<Coordination>::value,
201         decltype(   iterate(std::initializer_list<T>(), std::move(cn)))>::type {
202     return          iterate(std::initializer_list<T>(), std::move(cn));
203 }
204 /*! Returns an observable that sends each value from its arguments list.
205 
206     \tparam Value0  ...
207     \tparam ValueN  the type of sending values
208 
209     \param  v0  ...
210     \param  vn  values to send
211 
212     \return  Observable that sends each value from its arguments list.
213 
214     \sample
215     \snippet from.cpp from sample
216     \snippet output.txt from sample
217 
218     \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
219 */
220 template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)221 auto from(Value0 v0, ValueN... vn)
222     -> typename std::enable_if<!is_coordination<Value0>::value,
223         decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
224     std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
225     return iterate(std::move(c), identity_immediate());
226 }
227 /*! Returns an observable that sends each value from its arguments list, on the specified scheduler.
228 
229     \tparam Coordination  the type of the scheduler
230     \tparam Value0  ...
231     \tparam ValueN  the type of sending values
232 
233     \param  cn  the scheduler to use for scheduling the items
234     \param  v0  ...
235     \param  vn  values to send
236 
237     \return  Observable that sends each value from its arguments list.
238 
239     \sample
240     \snippet from.cpp threaded from sample
241     \snippet output.txt threaded from sample
242 
243     \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
244 */
245 template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)246 auto from(Coordination cn, Value0 v0, ValueN... vn)
247     -> typename std::enable_if<is_coordination<Coordination>::value,
248         decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
249     std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
250     return iterate(std::move(c), std::move(cn));
251 }
252 
253 
254 /*! Returns an observable that sends the specified item to observer and then completes.
255 
256     \tparam T  the type of the emitted item
257 
258     \param v  the value to send
259 
260     \return  Observable that sends the specified item to observer and then completes.
261 
262     \sample
263     \snippet just.cpp just sample
264     \snippet output.txt just sample
265 */
266 template<class Value0>
just(Value0 v0)267 auto just(Value0 v0)
268     -> typename std::enable_if<!is_coordination<Value0>::value,
269         decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
270     std::array<Value0, 1> c{{v0}};
271     return iterate(std::move(c), identity_immediate());
272 }
273 /*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.
274 
275     \tparam T             the type of the emitted item
276     \tparam Coordination  the type of the scheduler
277 
278     \param v   the value to send
279     \param cn  the scheduler to use for scheduling the items
280 
281     \return  Observable that sends the specified item to observer and then completes.
282 
283     \sample
284     \snippet just.cpp threaded just sample
285     \snippet output.txt threaded just sample
286 */
287 template<class Value0, class Coordination>
just(Value0 v0,Coordination cn)288 auto just(Value0 v0, Coordination cn)
289     -> typename std::enable_if<is_coordination<Coordination>::value,
290         decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
291     std::array<Value0, 1> c{{v0}};
292     return iterate(std::move(c), std::move(cn));
293 }
294 
295 /*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable.
296 
297     \tparam Observable  the type of the observable that emits values for resending
298     \tparam Value0      ...
299     \tparam ValueN      the type of sending values
300 
301     \param  o   the observable that emits values for resending
302     \param  v0  ...
303     \param  vn  values to send
304 
305     \return  Observable that sends the specified values before it begins to send items emitted by the given observable.
306 
307     \sample
308     \snippet start_with.cpp full start_with sample
309     \snippet output.txt full start_with sample
310 
311     Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable:
312     \snippet start_with.cpp short start_with sample
313     \snippet output.txt short start_with sample
314 */
315 template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)316 auto start_with(Observable o, Value0 v0, ValueN... vn)
317     -> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
318     return      from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
319 }
320 
321 }
322 
323 }
324 
325 #endif
326