1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_SOURCES_RX_ITERATE_HPP)
6 #define RXCPP_SOURCES_RX_ITERATE_HPP
7
8 #include "../rx-includes.hpp"
9
10 /*! \file rx-iterate.hpp
11
12 \brief Returns an observable that sends each value in the collection, on the specified scheduler.
13
14 \tparam Collection the type of the collection of values that this observable emits
15 \tparam Coordination the type of the scheduler (optional)
16
17 \param c collection containing values to send
18 \param cn the scheduler to use for scheduling the items (optional)
19
20 \return Observable that sends each value in the collection.
21
22 \sample
23 \snippet iterate.cpp iterate sample
24 \snippet output.txt iterate sample
25
26 \sample
27 \snippet iterate.cpp threaded iterate sample
28 \snippet output.txt threaded iterate sample
29
30 */
31
32 namespace rxcpp {
33
34 namespace sources {
35
36 namespace detail {
37
38 template<class Collection>
39 struct is_iterable
40 {
41 typedef rxu::decay_t<Collection> collection_type;
42
43 struct not_void {};
44 template<class CC>
45 static auto check(int) -> decltype(std::begin(*(CC*)nullptr));
46 template<class CC>
47 static not_void check(...);
48
49 static const bool value = !std::is_same<decltype(check<collection_type>(0)), not_void>::value;
50 };
51
52 template<class Collection>
53 struct iterate_traits
54 {
55 typedef rxu::decay_t<Collection> collection_type;
56 typedef rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))> iterator_type;
57 typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
58 };
59
60 template<class Collection, class Coordination>
61 struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>>>
62 {
63 typedef iterate<Collection, Coordination> this_type;
64 typedef iterate_traits<Collection> traits;
65
66 typedef rxu::decay_t<Coordination> coordination_type;
67 typedef typename coordination_type::coordinator_type coordinator_type;
68
69 typedef typename traits::collection_type collection_type;
70 typedef typename traits::iterator_type iterator_type;
71
72 struct iterate_initial_type
73 {
iterate_initial_typerxcpp::sources::detail::iterate::iterate_initial_type74 iterate_initial_type(collection_type c, coordination_type cn)
75 : collection(std::move(c))
76 , coordination(std::move(cn))
77 {
78 }
79 collection_type collection;
80 coordination_type coordination;
81 };
82 iterate_initial_type initial;
83
iteraterxcpp::sources::detail::iterate84 iterate(collection_type c, coordination_type cn)
85 : initial(std::move(c), std::move(cn))
86 {
87 }
88 template<class Subscriber>
on_subscriberxcpp::sources::detail::iterate89 void on_subscribe(Subscriber o) const {
90 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
91
92 typedef typename coordinator_type::template get<Subscriber>::type output_type;
93
94 struct iterate_state_type
95 : public iterate_initial_type
96 {
97 iterate_state_type(const iterate_initial_type& i, output_type o)
98 : iterate_initial_type(i)
99 , cursor(std::begin(iterate_initial_type::collection))
100 , end(std::end(iterate_initial_type::collection))
101 , out(std::move(o))
102 {
103 }
104 iterate_state_type(const iterate_state_type& o)
105 : iterate_initial_type(o)
106 , cursor(std::begin(iterate_initial_type::collection))
107 , end(std::end(iterate_initial_type::collection))
108 , out(std::move(o.out)) // since lambda capture does not yet support move
109 {
110 }
111 mutable iterator_type cursor;
112 iterator_type end;
113 mutable output_type out;
114 };
115
116 // creates a worker whose lifetime is the same as this subscription
117 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
118
119 iterate_state_type state(initial, o);
120
121 auto controller = coordinator.get_worker();
122
123 auto producer = [state](const rxsc::schedulable& self){
124 if (!state.out.is_subscribed()) {
125 // terminate loop
126 return;
127 }
128
129 if (state.cursor != state.end) {
130 // send next value
131 state.out.on_next(*state.cursor);
132 ++state.cursor;
133 }
134
135 if (state.cursor == state.end) {
136 state.out.on_completed();
137 // o is unsubscribed
138 return;
139 }
140
141 // tail recurse this same action to continue loop
142 self();
143 };
144 auto selectedProducer = on_exception(
145 [&](){return coordinator.act(producer);},
146 o);
147 if (selectedProducer.empty()) {
148 return;
149 }
150 controller.schedule(selectedProducer.get());
151
152 }
153 };
154
155 }
156
157 /*! @copydoc rx-iterate.hpp
158 */
159 template<class Collection>
iterate(Collection c)160 auto iterate(Collection c)
161 -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
162 return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
163 detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
164 }
165 /*! @copydoc rx-iterate.hpp
166 */
167 template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)168 auto iterate(Collection c, Coordination cn)
169 -> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
170 return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>>(
171 detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
172 }
173
174 /*! Returns an observable that sends an empty set of values and then completes.
175
176 \tparam T the type of elements (not) to be sent
177
178 \return Observable that sends an empty set of values and then completes.
179
180 This is a degenerate case of rxcpp::observable<void,void>#from(Value0,ValueN...) operator.
181
182 \note This is a degenerate case of ```from(Value0 v0, ValueN... vn)``` operator.
183 */
184 template<class T>
from()185 auto from()
186 -> decltype(iterate(std::initializer_list<T>(), identity_immediate())) {
187 return iterate(std::initializer_list<T>(), identity_immediate());
188 }
189 /*! Returns an observable that sends an empty set of values and then completes, on the specified scheduler.
190
191 \tparam T the type of elements (not) to be sent
192 \tparam Coordination the type of the scheduler
193
194 \return Observable that sends an empty set of values and then completes.
195
196 \note This is a degenerate case of ```from(Coordination cn, Value0 v0, ValueN... vn)``` operator.
197 */
198 template<class T, class Coordination>
from(Coordination cn)199 auto from(Coordination cn)
200 -> typename std::enable_if<is_coordination<Coordination>::value,
201 decltype( iterate(std::initializer_list<T>(), std::move(cn)))>::type {
202 return iterate(std::initializer_list<T>(), std::move(cn));
203 }
204 /*! Returns an observable that sends each value from its arguments list.
205
206 \tparam Value0 ...
207 \tparam ValueN the type of sending values
208
209 \param v0 ...
210 \param vn values to send
211
212 \return Observable that sends each value from its arguments list.
213
214 \sample
215 \snippet from.cpp from sample
216 \snippet output.txt from sample
217
218 \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
219 */
220 template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)221 auto from(Value0 v0, ValueN... vn)
222 -> typename std::enable_if<!is_coordination<Value0>::value,
223 decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
224 std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
225 return iterate(std::move(c), identity_immediate());
226 }
227 /*! Returns an observable that sends each value from its arguments list, on the specified scheduler.
228
229 \tparam Coordination the type of the scheduler
230 \tparam Value0 ...
231 \tparam ValueN the type of sending values
232
233 \param cn the scheduler to use for scheduling the items
234 \param v0 ...
235 \param vn values to send
236
237 \return Observable that sends each value from its arguments list.
238
239 \sample
240 \snippet from.cpp threaded from sample
241 \snippet output.txt threaded from sample
242
243 \note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
244 */
245 template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)246 auto from(Coordination cn, Value0 v0, ValueN... vn)
247 -> typename std::enable_if<is_coordination<Coordination>::value,
248 decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
249 std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
250 return iterate(std::move(c), std::move(cn));
251 }
252
253
254 /*! Returns an observable that sends the specified item to observer and then completes.
255
256 \tparam T the type of the emitted item
257
258 \param v the value to send
259
260 \return Observable that sends the specified item to observer and then completes.
261
262 \sample
263 \snippet just.cpp just sample
264 \snippet output.txt just sample
265 */
266 template<class Value0>
just(Value0 v0)267 auto just(Value0 v0)
268 -> typename std::enable_if<!is_coordination<Value0>::value,
269 decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
270 std::array<Value0, 1> c{{v0}};
271 return iterate(std::move(c), identity_immediate());
272 }
273 /*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.
274
275 \tparam T the type of the emitted item
276 \tparam Coordination the type of the scheduler
277
278 \param v the value to send
279 \param cn the scheduler to use for scheduling the items
280
281 \return Observable that sends the specified item to observer and then completes.
282
283 \sample
284 \snippet just.cpp threaded just sample
285 \snippet output.txt threaded just sample
286 */
287 template<class Value0, class Coordination>
just(Value0 v0,Coordination cn)288 auto just(Value0 v0, Coordination cn)
289 -> typename std::enable_if<is_coordination<Coordination>::value,
290 decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
291 std::array<Value0, 1> c{{v0}};
292 return iterate(std::move(c), std::move(cn));
293 }
294
295 /*! Returns an observable that sends the specified values before it begins to send items emitted by the given observable.
296
297 \tparam Observable the type of the observable that emits values for resending
298 \tparam Value0 ...
299 \tparam ValueN the type of sending values
300
301 \param o the observable that emits values for resending
302 \param v0 ...
303 \param vn values to send
304
305 \return Observable that sends the specified values before it begins to send items emitted by the given observable.
306
307 \sample
308 \snippet start_with.cpp full start_with sample
309 \snippet output.txt full start_with sample
310
311 Instead of passing the observable as a parameter, you can use rxcpp::observable<T, SourceOperator>::start_with method of the existing observable:
312 \snippet start_with.cpp short start_with sample
313 \snippet output.txt short start_with sample
314 */
315 template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)316 auto start_with(Observable o, Value0 v0, ValueN... vn)
317 -> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
318 return from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
319 }
320
321 }
322
323 }
324
325 #endif
326