1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-replay.hpp
6
7 \brief 1) replay(optional Coordination, optional CompositeSubscription)
8 Turn a cold observable hot, send all earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.
9
10 2) replay(Count, optional Coordination, optional CompositeSubscription)
11 Turn a cold observable hot, send at most count of earlier emitted values to any new subscriber, and allow connections to the source to be independent of subscriptions.
12
13 3) replay(Duration, optional Coordination, optional CompositeSubscription)
14 Turn a cold observable hot, send values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.
15
16 4) replay(Count, Duration, optional Coordination, optional CompositeSubscription)
17 Turn a cold observable hot, send at most count of values emitted within a specified time window to any new subscriber, and allow connections to the source to be independent of subscriptions.
18
19 \tparam Duration the type of the time interval (optional).
20 \tparam Count the type of the maximum number of the most recent items sent to new observers (optional).
21 \tparam Coordination the type of the scheduler (optional).
22
23 \param count the maximum number of the most recent items sent to new observers (optional).
24 \param d the duration of the window in which the replayed items must be emitted
25 \param cn a scheduler all values are queued and delivered on (optional).
26 \param cs the subscription to control lifetime (optional).
27
28 \return rxcpp::connectable_observable that shares a single subscription to the underlying observable that will replay all of its items and notifications to any future observer.
29
30 \sample
31 \snippet replay.cpp replay sample
32 \snippet output.txt replay sample
33
34 \sample
35 \snippet replay.cpp threaded replay sample
36 \snippet output.txt threaded replay sample
37
38 \sample
39 \snippet replay.cpp replay count sample
40 \snippet output.txt replay count sample
41
42 \sample
43 \snippet replay.cpp threaded replay count sample
44 \snippet output.txt threaded replay count sample
45
46 \sample
47 \snippet replay.cpp replay period sample
48 \snippet output.txt replay period sample
49
50 \sample
51 \snippet replay.cpp threaded replay period sample
52 \snippet output.txt threaded replay period sample
53
54 \sample
55 \snippet replay.cpp replay count+period sample
56 \snippet output.txt replay count+period sample
57
58 \sample
59 \snippet replay.cpp threaded replay count+period sample
60 \snippet output.txt threaded replay count+period sample
61 */
62
63 #if !defined(RXCPP_OPERATORS_RX_REPLAY_HPP)
64 #define RXCPP_OPERATORS_RX_REPLAY_HPP
65
66 #include "../rx-includes.hpp"
67 #include "./rx-multicast.hpp"
68
69 namespace rxcpp {
70
71 namespace operators {
72
73 namespace detail {
74
75 template<class... AN>
76 struct replay_invalid_arguments {};
77
78 template<class... AN>
79 struct replay_invalid : public rxo::operator_base<replay_invalid_arguments<AN...>> {
80 using type = observable<replay_invalid_arguments<AN...>, replay_invalid<AN...>>;
81 };
82 template<class... AN>
83 using replay_invalid_t = typename replay_invalid<AN...>::type;
84
85 }
86
87 /*! @copydoc rx-replay.hpp
88 */
89 template<class... AN>
replay(AN &&...an)90 auto replay(AN&&... an)
91 -> operator_factory<replay_tag, AN...> {
92 return operator_factory<replay_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
93 }
94
95 }
96
97 template<>
98 struct member_overload<replay_tag>
99 {
100 template<class Observable,
101 class Enabled = rxu::enable_if_all_true_type_t<
102 is_observable<Observable>>,
103 class SourceValue = rxu::value_type_t<Observable>,
104 class Subject = rxsub::replay<SourceValue, identity_one_worker>,
105 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
106 class Result = connectable_observable<SourceValue, Multicast>
107 >
memberrxcpp::member_overload108 static Result member(Observable&& o) {
109 return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), composite_subscription())));
110 }
111
112 template<class Observable,
113 class Enabled = rxu::enable_if_all_true_type_t<
114 is_observable<Observable>>,
115 class SourceValue = rxu::value_type_t<Observable>,
116 class Subject = rxsub::replay<SourceValue, identity_one_worker>,
117 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
118 class Result = connectable_observable<SourceValue, Multicast>
119 >
memberrxcpp::member_overload120 static Result member(Observable&& o, composite_subscription cs) {
121 return Result(Multicast(std::forward<Observable>(o), Subject(identity_current_thread(), cs)));
122 }
123
124 template<class Observable, class Coordination,
125 class Enabled = rxu::enable_if_all_true_type_t<
126 is_observable<Observable>,
127 is_coordination<Coordination>>,
128 class SourceValue = rxu::value_type_t<Observable>,
129 class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
130 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
131 class Result = connectable_observable<SourceValue, Multicast>
132 >
memberrxcpp::member_overload133 static Result member(Observable&& o, Coordination&& cn, composite_subscription cs = composite_subscription()) {
134 return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Coordination>(cn), cs)));
135 }
136
137 template<class Observable, class Count,
138 class Enabled = rxu::enable_if_all_true_type_t<
139 is_observable<Observable>,
140 std::is_integral<Count>>,
141 class SourceValue = rxu::value_type_t<Observable>,
142 class Subject = rxsub::replay<SourceValue, identity_one_worker>,
143 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
144 class Result = connectable_observable<SourceValue, Multicast>
145 >
memberrxcpp::member_overload146 static Result member(Observable&& o, Count count, composite_subscription cs = composite_subscription()) {
147 return Result(Multicast(std::forward<Observable>(o), Subject(count, identity_current_thread(), cs)));
148 }
149
150 template<class Observable, class Count, class Coordination,
151 class Enabled = rxu::enable_if_all_true_type_t<
152 is_observable<Observable>,
153 std::is_integral<Count>,
154 is_coordination<Coordination>>,
155 class SourceValue = rxu::value_type_t<Observable>,
156 class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
157 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
158 class Result = connectable_observable<SourceValue, Multicast>
159 >
memberrxcpp::member_overload160 static Result member(Observable&& o, Count count, Coordination&& cn, composite_subscription cs = composite_subscription()) {
161 return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Coordination>(cn), cs)));
162 }
163
164 template<class Observable, class Duration,
165 class IsDuration = rxu::is_duration<Duration>,
166 class Enabled = rxu::enable_if_all_true_type_t<
167 is_observable<Observable>,
168 IsDuration>,
169 class SourceValue = rxu::value_type_t<Observable>,
170 class Subject = rxsub::replay<SourceValue, identity_one_worker>,
171 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
172 class Result = connectable_observable<SourceValue, Multicast>
173 >
memberrxcpp::member_overload174 static Result member(Observable&& o, Duration&& d, composite_subscription cs = composite_subscription()) {
175 return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), identity_current_thread(), cs)));
176 }
177
178 template<class Observable, class Duration, class Coordination,
179 class IsDuration = rxu::is_duration<Duration>,
180 class Enabled = rxu::enable_if_all_true_type_t<
181 is_observable<Observable>,
182 IsDuration,
183 is_coordination<Coordination>>,
184 class SourceValue = rxu::value_type_t<Observable>,
185 class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
186 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
187 class Result = connectable_observable<SourceValue, Multicast>
188 >
memberrxcpp::member_overload189 static Result member(Observable&& o, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
190 return Result(Multicast(std::forward<Observable>(o), Subject(std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
191 }
192
193 template<class Observable, class Count, class Duration,
194 class IsDuration = rxu::is_duration<Duration>,
195 class Enabled = rxu::enable_if_all_true_type_t<
196 is_observable<Observable>,
197 std::is_integral<Count>,
198 IsDuration>,
199 class SourceValue = rxu::value_type_t<Observable>,
200 class Subject = rxsub::replay<SourceValue, identity_one_worker>,
201 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
202 class Result = connectable_observable<SourceValue, Multicast>
203 >
memberrxcpp::member_overload204 static Result member(Observable&& o, Count count, Duration&& d, composite_subscription cs = composite_subscription()) {
205 return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), identity_current_thread(), cs)));
206 }
207
208 template<class Observable, class Count, class Duration, class Coordination,
209 class IsDuration = rxu::is_duration<Duration>,
210 class Enabled = rxu::enable_if_all_true_type_t<
211 is_observable<Observable>,
212 std::is_integral<Count>,
213 IsDuration,
214 is_coordination<Coordination>>,
215 class SourceValue = rxu::value_type_t<Observable>,
216 class Subject = rxsub::replay<SourceValue, rxu::decay_t<Coordination>>,
217 class Multicast = rxo::detail::multicast<SourceValue, rxu::decay_t<Observable>, Subject>,
218 class Result = connectable_observable<SourceValue, Multicast>
219 >
memberrxcpp::member_overload220 static Result member(Observable&& o, Count count, Duration&& d, Coordination&& cn, composite_subscription cs = composite_subscription()) {
221 return Result(Multicast(std::forward<Observable>(o), Subject(count, std::forward<Duration>(d), std::forward<Coordination>(cn), cs)));
222 }
223
224 template<class... AN>
memberrxcpp::member_overload225 static operators::detail::replay_invalid_t<AN...> member(AN...) {
226 std::terminate();
227 return {};
228 static_assert(sizeof...(AN) == 10000, "replay takes (optional Count, optional Duration, optional Coordination, optional CompositeSubscription)");
229 }
230 };
231
232 }
233
234 #endif
235