• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 #pragma once
4 
5 /*! \file rx-replay.hpp
6 
7     \brief 1) replay(optional Coordination, optional CompositeSubscription)
8               Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.
9 
10            2) replay(Count, optional Coordination, optional CompositeSubscription)
11               Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.
12 
13            3) replay(Duration, optional Coordination, optional CompositeSubscription)
14               Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.
15 
16            4) replay(Count, Duration, optional Coordination, optional CompositeSubscription)
17               Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.
18 
19     \tparam Duration  the type of the time interval (optional).
20     \tparam Count  the type of the maximum number of the most recent items sent to new observers (optional).
21     \tparam Coordination  the type of the scheduler (optional).
22 
23     \param count  the maximum number of the most recent items sent to new observers (optional).
24     \param d  the duration of the window in which the replayed items must be emitted
25     \param cn  a scheduler all values are queued and delivered on (optional).
26     \param cs  the subscription to control lifetime (optional).
27 
28     \return  rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer.
29 
30     \sample
31     \snippet replay.cpp replay sample
32     \snippet output.txt replay sample
33 
34     \sample
35     \snippet replay.cpp threaded replay sample
36     \snippet output.txt threaded replay sample
37 
38     \sample
39     \snippet replay.cpp replay count sample
40     \snippet output.txt replay count sample
41 
42     \sample
43     \snippet replay.cpp threaded replay count sample
44     \snippet output.txt threaded replay count sample
45 
46     \sample
47     \snippet replay.cpp replay period sample
48     \snippet output.txt replay period sample
49 
50     \sample
51     \snippet replay.cpp threaded replay period sample
52     \snippet output.txt threaded replay period sample
53 
54     \sample
55     \snippet replay.cpp replay count+period sample
56     \snippet output.txt replay count+period sample
57 
58     \sample
59     \snippet replay.cpp threaded replay count+period sample
60     \snippet output.txt threaded replay count+period sample
61 */
62 
63 #if !defined(RXCPP_OPERATORS_RX_REPLAY_HPP)
64 #define RXCPP_OPERATORS_RX_REPLAY_HPP
65 
66 #include "../rx-includes.hpp"
67 #include "./rx-multicast.hpp"
68 
69 namespace rxcpp {
70 
71 namespace operators {
72 
73 namespace detail {
74 
75 template<class... AN>
76 struct replay_invalid_arguments {};
77 
78 template<class... AN>
79 struct replay_invalid : public rxo::operator_base<replay_invalid_arguments<AN...>> {
80     using type = observable<replay_invalid_arguments<AN...>, replay_invalid<AN...>>;
81 };
82 template<class... AN>
83 using replay_invalid_t = typename replay_invalid<AN...>::type;
84 
85 }
86 
87 /*! @copydoc rx-replay.hpp
88 */
89 template<class... AN>
replay(AN &&...an)90 auto replay(AN&&... an)
91     ->      operator_factory<replay_tag, AN...> {
92      return operator_factory<replay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
93 }
94 
95 }
96 
97  template<>
98 struct member_overload<replay_tag>
99 {
100     template<class Observable,
101         class Enabled = rxu::enable_if_all_true_type_t<
102             is_observable<Observable>>,
103         class SourceValue = rxu::value_type_t<Observable>,
104         class Subject = rxsub::replay<SourceValue, identity_one_worker>,
105         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
106         class Result = connectable_observable<SourceValue, Multicast>
107         >
memberrxcpp::member_overload108     static Result member(Observable&& o) {
109         return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), composite_subscription())));
110     }
111 
112     template<class Observable,
113         class Enabled = rxu::enable_if_all_true_type_t<
114             is_observable<Observable>>,
115         class SourceValue = rxu::value_type_t<Observable>,
116         class Subject = rxsub::replay<SourceValue, identity_one_worker>,
117         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
118         class Result = connectable_observable<SourceValue, Multicast>
119         >
memberrxcpp::member_overload120     static Result member(Observable&& o, composite_subscription cs) {
121         return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), cs)));
122     }
123 
124     template<class Observable, class Coordination,
125         class Enabled = rxu::enable_if_all_true_type_t<
126             is_observable<Observable>,
127             is_coordination<Coordination>>,
128         class SourceValue = rxu::value_type_t<Observable>,
129         class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
130         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
131         class Result = connectable_observable<SourceValue, Multicast>
132         >
memberrxcpp::member_overload133     static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
134         return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
135     }
136 
137     template<class Observable, class Count,
138         class Enabled = rxu::enable_if_all_true_type_t<
139             is_observable<Observable>,
140             std::is_integral<Count>>,
141         class SourceValue = rxu::value_type_t<Observable>,
142         class Subject = rxsub::replay<SourceValue, identity_one_worker>,
143         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
144         class Result = connectable_observable<SourceValue, Multicast>
145     >
memberrxcpp::member_overload146     static Result member(Observable&& o, Count count, composite_subscription cs = composite_subscription()) {
147         return Result(Multicast(std::forward<Observable>(o), Subject(count, identity_current_thread(), cs)));
148     }
149 
150     template<class Observable, class Count, class Coordination,
151         class Enabled = rxu::enable_if_all_true_type_t<
152             is_observable<Observable>,
153             std::is_integral<Count>,
154             is_coordination<Coordination>>,
155         class SourceValue = rxu::value_type_t<Observable>,
156         class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
157         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
158         class Result = connectable_observable<SourceValue, Multicast>
159         >
memberrxcpp::member_overload160     static Result member(Observable&& o, Count count, Coordination&& cn, composite_subscription cs = composite_subscription()) {
161         return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Coordination>(cn), cs)));
162     }
163 
164     template<class Observable, class Duration,
165         class IsDuration = rxu::is_duration<Duration>,
166         class Enabled = rxu::enable_if_all_true_type_t<
167             is_observable<Observable>,
168             IsDuration>,
169         class SourceValue = rxu::value_type_t<Observable>,
170         class Subject = rxsub::replay<SourceValue, identity_one_worker>,
171         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
172         class Result = connectable_observable<SourceValue, Multicast>
173     >
memberrxcpp::member_overload174     static Result member(Observable&& o, Duration&& d, composite_subscription cs = composite_subscription()) {
175         return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), identity_current_thread(), cs)));
176     }
177 
178     template<class Observable, class Duration, class Coordination,
179         class IsDuration = rxu::is_duration<Duration>,
180         class Enabled = rxu::enable_if_all_true_type_t<
181             is_observable<Observable>,
182             IsDuration,
183             is_coordination<Coordination>>,
184         class SourceValue = rxu::value_type_t<Observable>,
185         class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
186         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
187         class Result = connectable_observable<SourceValue, Multicast>
188         >
memberrxcpp::member_overload189     static Result member(Observable&& o, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
190         return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
191     }
192 
193     template<class Observable, class Count, class Duration,
194         class IsDuration = rxu::is_duration<Duration>,
195         class Enabled = rxu::enable_if_all_true_type_t<
196             is_observable<Observable>,
197             std::is_integral<Count>,
198             IsDuration>,
199         class SourceValue = rxu::value_type_t<Observable>,
200         class Subject = rxsub::replay<SourceValue, identity_one_worker>,
201         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
202         class Result = connectable_observable<SourceValue, Multicast>
203     >
memberrxcpp::member_overload204     static Result member(Observable&& o, Count count, Duration&& d, composite_subscription cs = composite_subscription()) {
205         return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), identity_current_thread(), cs)));
206     }
207 
208     template<class Observable, class Count, class Duration, class Coordination,
209         class IsDuration = rxu::is_duration<Duration>,
210         class Enabled = rxu::enable_if_all_true_type_t<
211             is_observable<Observable>,
212             std::is_integral<Count>,
213             IsDuration,
214             is_coordination<Coordination>>,
215         class SourceValue = rxu::value_type_t<Observable>,
216         class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
217         class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
218         class Result = connectable_observable<SourceValue, Multicast>
219     >
memberrxcpp::member_overload220     static Result member(Observable&& o, Count count, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
221         return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
222     }
223 
224     template<class... AN>
memberrxcpp::member_overload225     static operators::detail::replay_invalid_t<AN...> member(AN...) {
226         std::terminate();
227         return {};
228         static_assert(sizeof...(AN) == 10000, "replay takes (optional Count, optional Duration, optional Coordination, optional CompositeSubscription)");
229     }
230 };
231 
232 }
233 
234 #endif
235