• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 #if !defined(RXCPP_SOURCES_RX_RANGE_HPP)
6 #define RXCPP_SOURCES_RX_RANGE_HPP
7 
8 #include "../rx-includes.hpp"
9 
10 /*! \file rx-range.hpp
11 
12     \brief Returns an observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value. The values are sent on the specified scheduler.
13 
14     \tparam T             the type of the values that this observable emits
15     \tparam Coordination  the type of the scheduler (optional)
16 
17     \param  first  first value to send (optional)
18     \param  last   last value to send (optional)
19     \param  step   value to add to the previous value to get the next value (optional)
20     \param  cn     the scheduler to run the generator loop on (optional)
21 
22     \return  Observable that sends values in the range ```first```-```last``` by adding ```step``` to the previous value using the specified scheduler.
23 
24     \sample
25     \snippet range.cpp threaded range sample
26     \snippet output.txt threaded range sample
27 
28     An alternative way to specify the scheduler for emitted values is to use observable::subscribe_on operator
29     \snippet range.cpp subscribe_on range sample
30     \snippet output.txt subscribe_on range sample
31 */
32 
33 namespace rxcpp {
34 
35 namespace sources {
36 
37 namespace detail {
38 
39 template<class T, class Coordination>
40 struct range : public source_base<T>
41 {
42     typedef rxu::decay_t<Coordination> coordination_type;
43     typedef typename coordination_type::coordinator_type coordinator_type;
44 
45     struct range_state_type
46     {
range_state_typerxcpp::sources::detail::range::range_state_type47         range_state_type(T f, T l, std::ptrdiff_t s, coordination_type cn)
48             : next(f)
49             , last(l)
50             , step(s)
51             , coordination(std::move(cn))
52         {
53         }
54         mutable T next;
55         T last;
56         std::ptrdiff_t step;
57         coordination_type coordination;
58     };
59     range_state_type initial;
rangerxcpp::sources::detail::range60     range(T f, T l, std::ptrdiff_t s, coordination_type cn)
61         : initial(f, l, s, std::move(cn))
62     {
63     }
64     template<class Subscriber>
on_subscriberxcpp::sources::detail::range65     void on_subscribe(Subscriber o) const {
66         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
67 
68         // creates a worker whose lifetime is the same as this subscription
69         auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
70 
71         auto controller = coordinator.get_worker();
72 
73         auto state = initial;
74 
75         auto producer = [=](const rxsc::schedulable& self){
76                 auto& dest = o;
77                 if (!dest.is_subscribed()) {
78                     // terminate loop
79                     return;
80                 }
81 
82                 // send next value
83                 dest.on_next(state.next);
84                 if (!dest.is_subscribed()) {
85                     // terminate loop
86                     return;
87                 }
88 
89                 if (std::max(state.last, state.next) - std::min(state.last, state.next) < std::abs(state.step)) {
90                     if (state.last != state.next) {
91                         dest.on_next(state.last);
92                     }
93                     dest.on_completed();
94                     // o is unsubscribed
95                     return;
96                 }
97                 state.next = static_cast<T>(state.step + state.next);
98 
99                 // tail recurse this same action to continue loop
100                 self();
101             };
102 
103         auto selectedProducer = on_exception(
104             [&](){return coordinator.act(producer);},
105             o);
106         if (selectedProducer.empty()) {
107             return;
108         }
109 
110         controller.schedule(selectedProducer.get());
111     }
112 };
113 
114 }
115 
116 /*! @copydoc rx-create.hpp
117  */
118 template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)119 auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
120     ->      observable<T,   detail::range<T, identity_one_worker>> {
121     return  observable<T,   detail::range<T, identity_one_worker>>(
122                             detail::range<T, identity_one_worker>(first, last, step, identity_current_thread()));
123 }
124 /*! @copydoc rx-create.hpp
125  */
126 template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)127 auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
128     ->      observable<T,   detail::range<T, Coordination>> {
129     return  observable<T,   detail::range<T, Coordination>>(
130                             detail::range<T, Coordination>(first, last, step, std::move(cn)));
131 }
132 /*! @copydoc rx-create.hpp
133  */
134 template<class T, class Coordination>
range(T first,T last,Coordination cn)135 auto range(T first, T last, Coordination cn)
136     -> typename std::enable_if<is_coordination<Coordination>::value,
137             observable<T,   detail::range<T, Coordination>>>::type {
138     return  observable<T,   detail::range<T, Coordination>>(
139                             detail::range<T, Coordination>(first, last, 1, std::move(cn)));
140 }
141 /*! @copydoc rx-create.hpp
142  */
143 template<class T, class Coordination>
range(T first,Coordination cn)144 auto range(T first, Coordination cn)
145     -> typename std::enable_if<is_coordination<Coordination>::value,
146             observable<T,   detail::range<T, Coordination>>>::type {
147     return  observable<T,   detail::range<T, Coordination>>(
148                             detail::range<T, Coordination>(first, std::numeric_limits<T>::max(), 1, std::move(cn)));
149 }
150 
151 }
152 
153 }
154 
155 #endif
156