1 #include "../test.h"
2
3 namespace detail {
4
5 template<class Predicate>
6 struct liftfilter
7 {
8 typedef typename std::decay<Predicate>::type test_type;
9 test_type test;
10
liftfilterdetail::liftfilter11 liftfilter(test_type t)
12 : test(t)
13 {
14 }
15
16 template<class Subscriber>
17 struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type>
18 {
19 typedef filter_observer<Subscriber> this_type;
20 typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type;
21 typedef typename base_type::value_type value_type;
22 typedef typename std::decay<Subscriber>::type dest_type;
23 typedef rx::observer<value_type, this_type> observer_type;
24 dest_type dest;
25 test_type test;
26
filter_observerdetail::liftfilter::filter_observer27 filter_observer(dest_type d, test_type t)
28 : dest(d)
29 , test(t)
30 {
31 }
on_nextdetail::liftfilter::filter_observer32 void on_next(typename dest_type::value_type v) const {
33 bool filtered = false;
34 RXCPP_TRY {
35 filtered = !test(v);
36 } RXCPP_CATCH(...) {
37 dest.on_error(rxu::current_exception());
38 return;
39 }
40 if (!filtered) {
41 dest.on_next(v);
42 }
43 }
on_errordetail::liftfilter::filter_observer44 void on_error(rxu::error_ptr e) const {
45 dest.on_error(e);
46 }
on_completeddetail::liftfilter::filter_observer47 void on_completed() const {
48 dest.on_completed();
49 }
50
makedetail::liftfilter::filter_observer51 static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
52 return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
53 }
54 };
55
56 template<class Subscriber>
operator ()detail::liftfilter57 auto operator()(const Subscriber& dest) const
58 -> decltype(filter_observer<Subscriber>::make(dest, test)) {
59 return filter_observer<Subscriber>::make(dest, test);
60 }
61 };
62
63 }
64
65 namespace {
66
67 template<class Predicate>
liftfilter(Predicate && p)68 auto liftfilter(Predicate&& p)
69 -> detail::liftfilter<typename std::decay<Predicate>::type> {
70 return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p));
71 }
72
IsPrime(int x)73 bool IsPrime(int x)
74 {
75 if (x < 2) return false;
76 for (int i = 2; i <= x/2; ++i)
77 {
78 if (x % i == 0)
79 return false;
80 }
81 return true;
82 }
83
84 }
85
86 SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){
87 GIVEN("a test hot observable of ints"){
88 auto sc = rxsc::make_test();
89 auto w = sc.create_worker();
90 const rxsc::test::messages<int> on;
91
92 long invoked = 0;
93
94 auto xs = sc.make_hot_observable({
95 on.next(110, 1),
96 on.next(180, 2),
97 on.next(230, 3),
98 on.next(270, 4),
99 on.next(340, 5),
100 on.next(380, 6),
101 on.next(390, 7),
102 on.next(450, 8),
103 on.next(470, 9),
104 on.next(560, 10),
105 on.next(580, 11),
106 on.completed(600)
107 });
108
109 WHEN("filtered to ints that are primes"){
110
111 auto res = w.start(
__anon3192a8220202() 112 [&xs, &invoked]() {
113 return xs
114 .lift<int>(liftfilter([&invoked](int x) {
115 invoked++;
116 return IsPrime(x);
117 }))
118 // forget type to workaround lambda deduction bug on msvc 2013
119 .as_dynamic();
120 },
121 400
122 );
123
124 THEN("the output only contains primes that arrived before disposal"){
125 auto required = rxu::to_vector({
126 on.next(230, 3),
127 on.next(340, 5),
128 on.next(390, 7)
129 });
130 auto actual = res.get_observer().messages();
131 REQUIRE(required == actual);
132 }
133
134 THEN("there was one subscription and one unsubscription"){
135 auto required = rxu::to_vector({
136 on.subscribe(200, 400)
137 });
138 auto actual = xs.subscriptions();
139 REQUIRE(required == actual);
140 }
141
142 THEN("where was called until disposed"){
143 REQUIRE(5 == invoked);
144 }
145 }
146 }
147 }
148
149 SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){
150 GIVEN("a test hot observable of ints"){
151 auto sc = rxsc::make_test();
152 auto w = sc.create_worker();
153 const rxsc::test::messages<int> on;
154
155 long invoked = 0;
156
157 auto xs = sc.make_hot_observable({
158 on.next(110, 1),
159 on.next(180, 2),
160 on.next(230, 3),
161 on.next(270, 4),
162 on.next(340, 5),
163 on.next(380, 6),
164 on.next(390, 7),
165 on.next(450, 8),
166 on.next(470, 9),
167 on.next(560, 10),
168 on.next(580, 11),
169 on.completed(600)
170 });
171
172 WHEN("filtered to ints that are primes"){
173
174 auto res = w.start(
__anon3192a8220402() 175 [&xs, &invoked]() {
176 return xs
177 >> rxo::lift<int>(liftfilter([&invoked](int x) {
178 invoked++;
179 return IsPrime(x);
180 }))
181 // forget type to workaround lambda deduction bug on msvc 2013
182 >> rxo::as_dynamic();
183 },
184 400
185 );
186
187 THEN("the output only contains primes that arrived before disposal"){
188 auto required = rxu::to_vector({
189 on.next(230, 3),
190 on.next(340, 5),
191 on.next(390, 7)
192 });
193 auto actual = res.get_observer().messages();
194 REQUIRE(required == actual);
195 }
196
197 THEN("there was one subscription and one unsubscription"){
198 auto required = rxu::to_vector({
199 on.subscribe(200, 400)
200 });
201 auto actual = xs.subscriptions();
202 REQUIRE(required == actual);
203 }
204
205 THEN("where was called until disposed"){
206 REQUIRE(5 == invoked);
207 }
208 }
209 }
210 }
211
212 SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){
213 GIVEN("a test hot observable of ints"){
214 auto sc = rxsc::make_test();
215 auto w = sc.create_worker();
216 const rxsc::test::messages<int> on;
217
218 long invoked = 0;
219
220 auto xs = sc.make_hot_observable({
221 on.next(110, 1),
222 on.next(180, 2),
223 on.next(230, 3),
224 on.next(270, 4),
225 on.next(340, 5),
226 on.next(380, 6),
227 on.next(390, 7),
228 on.next(450, 8),
229 on.next(470, 9),
230 on.next(560, 10),
231 on.next(580, 11),
232 on.completed(600)
233 });
234
235 WHEN("filtered to ints that are primes"){
236
237 auto res = w.start(
__anon3192a8220602() 238 [&xs, &invoked]() {
239 auto predicate = [&](int x){
240 invoked++;
241 return IsPrime(x);
242 };
243 return xs
244 .lift<int>([=](rx::subscriber<int> dest){
245 // VS2013 deduction issue requires dynamic (type-forgetting)
246 return rx::make_subscriber<int>(
247 dest,
248 rx::make_observer_dynamic<int>(
249 [=](int n){
250 bool pass = false;
251 RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());};
252 if (pass) {dest.on_next(n);}
253 },
254 [=](rxu::error_ptr e){dest.on_error(e);},
255 [=](){dest.on_completed();}));
256 })
257 // forget type to workaround lambda deduction bug on msvc 2013
258 .as_dynamic();
259 },
260 400
261 );
262
263 THEN("the output only contains primes that arrived before disposal"){
264 auto required = rxu::to_vector({
265 on.next(230, 3),
266 on.next(340, 5),
267 on.next(390, 7)
268 });
269 auto actual = res.get_observer().messages();
270 REQUIRE(required == actual);
271 }
272
273 THEN("there was one subscription and one unsubscription"){
274 auto required = rxu::to_vector({
275 on.subscribe(200, 400)
276 });
277 auto actual = xs.subscriptions();
278 REQUIRE(required == actual);
279 }
280
281 THEN("where was called until disposed"){
282 REQUIRE(5 == invoked);
283 }
284 }
285 }
286 }
287
288