1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_SOURCES_RX_RANGE_HPP)
6 #define RXCPP_SOURCES_RX_RANGE_HPP
7
8 #include "../rx-includes.hpp"
9
10 /*! \file rx-range.hpp
11
12 \brief Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler.
13
14 \tparam T the type of the values that this observable emits
15 \tparam Coordination the type of the scheduler (optional)
16
17 \param first first value to send (optional)
18 \param last last value to send (optional)
19 \param step value to add to the previous value to get the next value (optional)
20 \param cn the scheduler to run the generator loop on (optional)
21
22 \return Observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value using the specified scheduler.
23
24 \sample
25 \snippet range.cpp threaded range sample
26 \snippet output.txt threaded range sample
27
28 An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator
29 \snippet range.cpp subscribe_on range sample
30 \snippet output.txt subscribe_on range sample
31 */
32
33 namespace rxcpp {
34
35 namespace sources {
36
37 namespace detail {
38
39 template<class T, class Coordination>
40 struct range : public source_base<T>
41 {
42 typedef rxu::decay_t<Coordination> coordination_type;
43 typedef typename coordination_type::coordinator_type coordinator_type;
44
45 struct range_state_type
46 {
range_state_typerxcpp::sources::detail::range::range_state_type47 range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn)
48 : next(f)
49 , last(l)
50 , step(s)
51 , coordination(std::move(cn))
52 {
53 }
54 mutable T next;
55 T last;
56 std::ptrdiff_t step;
57 coordination_type coordination;
58 };
59 range_state_type initial;
rangerxcpp::sources::detail::range60 range(T f, T l, std::ptrdiff_t s, coordination_type cn)
61 : initial(f, l, s, std::move(cn))
62 {
63 }
64 template<class Subscriber>
on_subscriberxcpp::sources::detail::range65 void on_subscribe(Subscriber o) const {
66 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
67
68 // creates a worker whose lifetime is the same as this subscription
69 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
70
71 auto controller = coordinator.get_worker();
72
73 auto state = initial;
74
75 auto producer = [=](const rxsc::schedulable& self){
76 auto& dest = o;
77 if (!dest.is_subscribed()) {
78 // terminate loop
79 return;
80 }
81
82 // send next value
83 dest.on_next(state.next);
84 if (!dest.is_subscribed()) {
85 // terminate loop
86 return;
87 }
88
89 if (std::max(state.last, state.next) - std::min(state.last, state.next) < std::abs(state.step)) {
90 if (state.last != state.next) {
91 dest.on_next(state.last);
92 }
93 dest.on_completed();
94 // o is unsubscribed
95 return;
96 }
97 state.next = static_cast<T>(state.step + state.next);
98
99 // tail recurse this same action to continue loop
100 self();
101 };
102
103 auto selectedProducer = on_exception(
104 [&](){return coordinator.act(producer);},
105 o);
106 if (selectedProducer.empty()) {
107 return;
108 }
109
110 controller.schedule(selectedProducer.get());
111 }
112 };
113
114 }
115
116 /*! @copydoc rx-create.hpp
117 */
118 template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)119 auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
120 -> observable<T, detail::range<T, identity_one_worker>> {
121 return observable<T, detail::range<T, identity_one_worker>>(
122 detail::range<T, identity_one_worker>(first, last, step, identity_current_thread()));
123 }
124 /*! @copydoc rx-create.hpp
125 */
126 template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)127 auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
128 -> observable<T, detail::range<T, Coordination>> {
129 return observable<T, detail::range<T, Coordination>>(
130 detail::range<T, Coordination>(first, last, step, std::move(cn)));
131 }
132 /*! @copydoc rx-create.hpp
133 */
134 template<class T, class Coordination>
range(T first,T last,Coordination cn)135 auto range(T first, T last, Coordination cn)
136 -> typename std::enable_if<is_coordination<Coordination>::value,
137 observable<T, detail::range<T, Coordination>>>::type {
138 return observable<T, detail::range<T, Coordination>>(
139 detail::range<T, Coordination>(first, last, 1, std::move(cn)));
140 }
141 /*! @copydoc rx-create.hpp
142 */
143 template<class T, class Coordination>
range(T first,Coordination cn)144 auto range(T first, Coordination cn)
145 -> typename std::enable_if<is_coordination<Coordination>::value,
146 observable<T, detail::range<T, Coordination>>>::type {
147 return observable<T, detail::range<T, Coordination>>(
148 detail::range<T, Coordination>(first, std::numeric_limits<T>::max(), 1, std::move(cn)));
149 }
150
151 }
152
153 }
154
155 #endif
156