• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #pragma once
2 
3 /*! \file rx-retry-repeat-common.hpp
4 
5     \brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
6 
7 */
8 
9 #include "../rx-includes.hpp"
10 
11 namespace rxcpp {
12   namespace operators {
13     namespace detail {
14 
15       namespace retry_repeat_common {
16         // Structure to perform general retry/repeat operations on state
17         template <class Values, class Subscriber, class EventHandlers, class T>
18         struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
19                             public Values {
20 
21           typedef Subscriber output_type;
state_typerxcpp::operators::detail::retry_repeat_common::state_type22           state_type(const Values& i, const output_type& oarg)
23             : Values(i),
24               source_lifetime(composite_subscription::empty()),
25               out(oarg) {
26           }
27 
do_subscriberxcpp::operators::detail::retry_repeat_common::state_type28           void do_subscribe() {
29             auto state = this->shared_from_this();
30 
31             state->out.remove(state->lifetime_token);
32             state->source_lifetime.unsubscribe();
33 
34             state->source_lifetime = composite_subscription();
35             state->lifetime_token = state->out.add(state->source_lifetime);
36 
37             state->source.subscribe(
38                                     state->out,
39                                     state->source_lifetime,
40                                     // on_next
41                                     [state](T t) {
42                                       state->out.on_next(t);
43                                     },
44                                     // on_error
45                                     [state](rxu::error_ptr e) {
46                                       EventHandlers::on_error(state, e);
47                                     },
48                                     // on_completed
49                                     [state]() {
50                                       EventHandlers::on_completed(state);
51                                     }
52                                     );
53           }
54 
55           composite_subscription source_lifetime;
56           output_type out;
57           composite_subscription::weak_subscription lifetime_token;
58         };
59 
60         // Finite case (explicitely limited with the number of times)
61         template <class EventHandlers, class T, class Observable, class Count>
62         struct finite : public operator_base<T> {
63           typedef rxu::decay_t<Observable> source_type;
64           typedef rxu::decay_t<Count> count_type;
65 
66           struct values {
valuesrxcpp::operators::detail::retry_repeat_common::finite::values67             values(source_type s, count_type t)
68               : source(std::move(s)),
69                 remaining_(std::move(t)) {
70             }
71 
completed_predicaterxcpp::operators::detail::retry_repeat_common::finite::values72             inline bool completed_predicate() const {
73               // Return true if we are completed
74               return remaining_ <= 0;
75             }
76 
updaterxcpp::operators::detail::retry_repeat_common::finite::values77             inline void update() {
78               // Decrement counter
79               --remaining_;
80             }
81 
82             source_type source;
83 
84           private:
85             // Counter to hold number of times remaining to complete
86             count_type remaining_;
87           };
88 
finiterxcpp::operators::detail::retry_repeat_common::finite89           finite(source_type s, count_type t)
90             : initial_(std::move(s), std::move(t)) {
91           }
92 
93           template<class Subscriber>
on_subscriberxcpp::operators::detail::retry_repeat_common::finite94           void on_subscribe(const Subscriber& s) const {
95             typedef state_type<values, Subscriber, EventHandlers, T> state_t;
96             // take a copy of the values for each subscription
97             auto state = std::make_shared<state_t>(initial_, s);
98             if (initial_.completed_predicate()) {
99               // return completed
100               state->out.on_completed();
101             } else {
102               // start the first iteration
103               state->do_subscribe();
104             }
105           }
106 
107         private:
108           values initial_;
109         };
110 
111         // Infinite case
112         template <class EventHandlers, class T, class Observable>
113         struct infinite : public operator_base<T> {
114           typedef rxu::decay_t<Observable> source_type;
115 
116           struct values {
valuesrxcpp::operators::detail::retry_repeat_common::infinite::values117             values(source_type s)
118               : source(std::move(s)) {
119             }
120 
completed_predicaterxcpp::operators::detail::retry_repeat_common::infinite::values121             static inline bool completed_predicate() {
122               // Infinite never completes
123               return false;
124             }
125 
updaterxcpp::operators::detail::retry_repeat_common::infinite::values126             static inline void update() {
127               // Infinite does not need to update state
128             }
129 
130             source_type source;
131           };
132 
infiniterxcpp::operators::detail::retry_repeat_common::infinite133           infinite(source_type s) : initial_(std::move(s)) {
134           }
135 
136           template<class Subscriber>
on_subscriberxcpp::operators::detail::retry_repeat_common::infinite137           void on_subscribe(const Subscriber& s) const {
138             typedef state_type<values, Subscriber, EventHandlers, T> state_t;
139             // take a copy of the values for each subscription
140             auto state = std::make_shared<state_t>(initial_, s);
141             // start the first iteration
142             state->do_subscribe();
143           }
144 
145         private:
146           values initial_;
147         };
148 
149 
150       }
151     }
152   }
153 }
154