1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 /*! \file rx-sequence_equal.hpp
6
7 \brief Determine whether two Observables emit the same sequence of items.
8
9 \tparam OtherSource the type of the other observable.
10 \tparam BinaryPredicate the type of the value comparing function (optional). The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
11 \tparam Coordination the type of the scheduler (optional).
12
13 \param t the other Observable that emits items to compare.
14 \param pred the function that implements comparison of two values (optional).
15 \param cn the scheduler (optional).
16
17 \return Observable that emits true only if both sequences terminate normally after emitting the same sequence of items in the same order; otherwise it will emit false.
18
19 \sample
20 \snippet sequence_equal.cpp sequence_equal sample
21 \snippet output.txt sequence_equal sample
22 */
23
24 #if !defined(RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP)
25 #define RXCPP_OPERATORS_RX_SEQUENCE_EQUAL_HPP
26
27 #include "../rx-includes.hpp"
28
29 namespace rxcpp {
30
31 namespace operators {
32
33 namespace detail {
34
35 template<class... AN>
36 struct sequence_equal_invalid_arguments {};
37
38 template<class... AN>
39 struct sequence_equal_invalid : public rxo::operator_base<sequence_equal_invalid_arguments<AN...>> {
40 using type = observable<sequence_equal_invalid_arguments<AN...>, sequence_equal_invalid<AN...>>;
41 };
42 template<class... AN>
43 using sequence_equal_invalid_t = typename sequence_equal_invalid<AN...>::type;
44
45 template<class T, class Observable, class OtherObservable, class BinaryPredicate, class Coordination>
46 struct sequence_equal : public operator_base<bool>
47 {
48 typedef rxu::decay_t<Observable> source_type;
49 typedef rxu::decay_t<T> source_value_type;
50 typedef rxu::decay_t<OtherObservable> other_source_type;
51 typedef typename other_source_type::value_type other_source_value_type;
52 typedef rxu::decay_t<BinaryPredicate> predicate_type;
53 typedef rxu::decay_t<Coordination> coordination_type;
54 typedef typename coordination_type::coordinator_type coordinator_type;
55
56 struct values {
valuesrxcpp::operators::detail::sequence_equal::values57 values(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
58 : source(std::move(s))
59 , other(std::move(t))
60 , pred(std::move(pred))
61 , coordination(std::move(sf))
62 {
63 }
64
65 source_type source;
66 other_source_type other;
67 predicate_type pred;
68 coordination_type coordination;
69 };
70
71 values initial;
72
sequence_equalrxcpp::operators::detail::sequence_equal73 sequence_equal(source_type s, other_source_type t, predicate_type pred, coordination_type sf)
74 : initial(std::move(s), std::move(t), std::move(pred), std::move(sf))
75 {
76 }
77
78 template<class Subscriber>
on_subscriberxcpp::operators::detail::sequence_equal79 void on_subscribe(Subscriber s) const {
80
81 typedef Subscriber output_type;
82
83 struct state_type
84 : public std::enable_shared_from_this<state_type>
85 , public values
86 {
87 state_type(const values& vals, coordinator_type coor, const output_type& o)
88 : values(vals)
89 , coordinator(std::move(coor))
90 , out(o)
91 , source_completed(false)
92 , other_completed(false)
93 {
94 out.add(other_lifetime);
95 out.add(source_lifetime);
96 }
97
98 composite_subscription other_lifetime;
99 composite_subscription source_lifetime;
100 coordinator_type coordinator;
101 output_type out;
102
103 mutable std::list<source_value_type> source_values;
104 mutable std::list<other_source_value_type> other_values;
105 mutable bool source_completed;
106 mutable bool other_completed;
107 };
108
109 auto coordinator = initial.coordination.create_coordinator();
110 auto state = std::make_shared<state_type>(initial, std::move(coordinator), std::move(s));
111
112 auto other = on_exception(
113 [&](){ return state->coordinator.in(state->other); },
114 state->out);
115 if (other.empty()) {
116 return;
117 }
118
119 auto source = on_exception(
120 [&](){ return state->coordinator.in(state->source); },
121 state->out);
122 if (source.empty()) {
123 return;
124 }
125
126 auto check_equal = [state]() {
127 if(!state->source_values.empty() && !state->other_values.empty()) {
128 auto x = std::move(state->source_values.front());
129 state->source_values.pop_front();
130
131 auto y = std::move(state->other_values.front());
132 state->other_values.pop_front();
133
134 if (!state->pred(x, y)) {
135 state->out.on_next(false);
136 state->out.on_completed();
137 }
138 } else {
139 if((!state->source_values.empty() && state->other_completed) ||
140 (!state->other_values.empty() && state->source_completed)) {
141 state->out.on_next(false);
142 state->out.on_completed();
143 }
144 }
145 };
146
147 auto check_complete = [state]() {
148 if(state->source_completed && state->other_completed) {
149 state->out.on_next(state->source_values.empty() && state->other_values.empty());
150 state->out.on_completed();
151 }
152 };
153
154 auto sinkOther = make_subscriber<other_source_value_type>(
155 state->out,
156 state->other_lifetime,
157 // on_next
158 [state, check_equal](other_source_value_type t) {
159 auto& values = state->other_values;
160 values.push_back(t);
161 check_equal();
162 },
163 // on_error
164 [state](rxu::error_ptr e) {
165 state->out.on_error(e);
166 },
167 // on_completed
168 [state, check_complete]() {
169 auto& completed = state->other_completed;
170 completed = true;
171 check_complete();
172 }
173 );
174
175 auto selectedSinkOther = on_exception(
176 [&](){ return state->coordinator.out(sinkOther); },
177 state->out);
178 if (selectedSinkOther.empty()) {
179 return;
180 }
181 other->subscribe(std::move(selectedSinkOther.get()));
182
183 source.get().subscribe(
184 state->source_lifetime,
185 // on_next
186 [state, check_equal](source_value_type t) {
187 auto& values = state->source_values;
188 values.push_back(t);
189 check_equal();
190 },
191 // on_error
192 [state](rxu::error_ptr e) {
193 state->out.on_error(e);
194 },
195 // on_completed
196 [state, check_complete]() {
197 auto& completed = state->source_completed;
198 completed = true;
199 check_complete();
200 }
201 );
202 }
203 };
204
205 }
206
207 /*! @copydoc rx-sequence_equal.hpp
208 */
209 template<class... AN>
sequence_equal(AN &&...an)210 auto sequence_equal(AN&&... an)
211 -> operator_factory<sequence_equal_tag, AN...> {
212 return operator_factory<sequence_equal_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
213 }
214
215 }
216
217 template<>
218 struct member_overload<sequence_equal_tag>
219 {
220 template<class Observable, class OtherObservable,
221 class Enabled = rxu::enable_if_all_true_type_t<
222 is_observable<Observable>,
223 is_observable<OtherObservable>>,
224 class SourceValue = rxu::value_type_t<Observable>,
225 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, identity_one_worker>,
226 class Value = rxu::value_type_t<SequenceEqual>,
227 class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload228 static Result member(Observable&& o, OtherObservable&& t) {
229 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), identity_current_thread()));
230 }
231
232 template<class Observable, class OtherObservable, class BinaryPredicate,
233 class IsCoordination = is_coordination<BinaryPredicate>,
234 class Enabled = rxu::enable_if_all_true_type_t<
235 is_observable<Observable>,
236 is_observable<OtherObservable>,
237 rxu::negation<IsCoordination>>,
238 class SourceValue = rxu::value_type_t<Observable>,
239 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, identity_one_worker>,
240 class Value = rxu::value_type_t<SequenceEqual>,
241 class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload242 static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred) {
243 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), identity_current_thread()));
244 }
245
246 template<class Observable, class OtherObservable, class Coordination,
247 class Enabled = rxu::enable_if_all_true_type_t<
248 is_observable<Observable>,
249 is_observable<OtherObservable>,
250 is_coordination<Coordination>>,
251 class SourceValue = rxu::value_type_t<Observable>,
252 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::equal_to<>, rxu::decay_t<Coordination>>,
253 class Value = rxu::value_type_t<SequenceEqual>,
254 class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload255 static Result member(Observable&& o, OtherObservable&& t, Coordination&& cn) {
256 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), rxu::equal_to<>(), std::forward<Coordination>(cn)));
257 }
258
259 template<class Observable, class OtherObservable, class BinaryPredicate, class Coordination,
260 class Enabled = rxu::enable_if_all_true_type_t<
261 is_observable<Observable>,
262 is_observable<OtherObservable>,
263 is_coordination<Coordination>>,
264 class SourceValue = rxu::value_type_t<Observable>,
265 class SequenceEqual = rxo::detail::sequence_equal<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<OtherObservable>, rxu::decay_t<BinaryPredicate>, rxu::decay_t<Coordination>>,
266 class Value = rxu::value_type_t<SequenceEqual>,
267 class Result = observable<Value, SequenceEqual>>
memberrxcpp::member_overload268 static Result member(Observable&& o, OtherObservable&& t, BinaryPredicate&& pred, Coordination&& cn) {
269 return Result(SequenceEqual(std::forward<Observable>(o), std::forward<OtherObservable>(t), std::forward<BinaryPredicate>(pred), std::forward<Coordination>(cn)));
270 }
271
272 template<class... AN>
memberrxcpp::member_overload273 static operators::detail::sequence_equal_invalid_t<AN...> member(const AN&...) {
274 std::terminate();
275 return {};
276 static_assert(sizeof...(AN) == 10000, "sequence_equal takes (OtherObservable, optional BinaryPredicate, optional Coordination)");
277 }
278 };
279
280 }
281
282 #endif
283