1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 namespace detail {
15
16 class test_type : public scheduler_interface
17 {
18 public:
19
20 typedef scheduler_interface::clock_type clock_type;
21
22 struct test_type_state : public virtual_time<long, long>
23 {
24 typedef virtual_time<long, long> base;
25
26 using base::schedule_absolute;
27 using base::schedule_relative;
28
nowrxcpp::schedulers::detail::test_type::test_type_state29 clock_type::time_point now() const {
30 return to_time_point(clock_now);
31 }
32
schedule_absoluterxcpp::schedulers::detail::test_type::test_type_state33 virtual void schedule_absolute(long when, const schedulable& a) const
34 {
35 if (when <= base::clock_now)
36 when = base::clock_now + 1;
37
38 return base::schedule_absolute(when, a);
39 }
40
addrxcpp::schedulers::detail::test_type::test_type_state41 virtual long add(long absolute, long relative) const
42 {
43 return absolute + relative;
44 }
45
to_time_pointrxcpp::schedulers::detail::test_type::test_type_state46 virtual clock_type::time_point to_time_point(long absolute) const
47 {
48 return clock_type::time_point(std::chrono::milliseconds(absolute));
49 }
50
to_relativerxcpp::schedulers::detail::test_type::test_type_state51 virtual long to_relative(clock_type::duration d) const
52 {
53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
54 }
55 };
56
57 private:
58 mutable std::shared_ptr<test_type_state> state;
59
60 public:
61 struct test_type_worker : public worker_interface
62 {
63 mutable std::shared_ptr<test_type_state> state;
64
65 typedef test_type_state::absolute absolute;
66 typedef test_type_state::relative relative;
67
test_type_workerrxcpp::schedulers::detail::test_type::test_type_worker68 test_type_worker(std::shared_ptr<test_type_state> st)
69 : state(std::move(st))
70 {
71 }
72
nowrxcpp::schedulers::detail::test_type::test_type_worker73 virtual clock_type::time_point now() const {
74 return state->now();
75 }
76
schedulerxcpp::schedulers::detail::test_type::test_type_worker77 virtual void schedule(const schedulable& scbl) const {
78 state->schedule_absolute(state->clock(), scbl);
79 }
80
schedulerxcpp::schedulers::detail::test_type::test_type_worker81 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82 state->schedule_relative(state->to_relative(when - now()), scbl);
83 }
84
schedule_absoluterxcpp::schedulers::detail::test_type::test_type_worker85 void schedule_absolute(absolute when, const schedulable& scbl) const {
86 state->schedule_absolute(when, scbl);
87 }
88
schedule_relativerxcpp::schedulers::detail::test_type::test_type_worker89 void schedule_relative(relative when, const schedulable& scbl) const {
90 state->schedule_relative(when, scbl);
91 }
92
is_enabledrxcpp::schedulers::detail::test_type::test_type_worker93 bool is_enabled() const {return state->is_enabled();}
clockrxcpp::schedulers::detail::test_type::test_type_worker94 absolute clock() const {return state->clock();}
95
startrxcpp::schedulers::detail::test_type::test_type_worker96 void start() const
97 {
98 state->start();
99 }
100
stoprxcpp::schedulers::detail::test_type::test_type_worker101 void stop() const
102 {
103 state->stop();
104 }
105
advance_torxcpp::schedulers::detail::test_type::test_type_worker106 void advance_to(absolute time) const
107 {
108 state->advance_to(time);
109 }
110
advance_byrxcpp::schedulers::detail::test_type::test_type_worker111 void advance_by(relative time) const
112 {
113 state->advance_by(time);
114 }
115
sleeprxcpp::schedulers::detail::test_type::test_type_worker116 void sleep(relative time) const
117 {
118 state->sleep(time);
119 }
120
121 template<class T>
122 subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
123 };
124
125 public:
test_type()126 test_type()
127 : state(std::make_shared<test_type_state>())
128 {
129 }
130
now() const131 virtual clock_type::time_point now() const {
132 return state->now();
133 }
134
create_worker(composite_subscription cs) const135 virtual worker create_worker(composite_subscription cs) const {
136 return worker(cs, std::make_shared<test_type_worker>(state));
137 }
138
is_enabled() const139 bool is_enabled() const {return state->is_enabled();}
clock()140 long clock() {
141 return state->clock();
142 }
143
to_time_point(long absolute) const144 clock_type::time_point to_time_point(long absolute) const {
145 return state->to_time_point(absolute);
146 }
147
create_test_type_worker_interface() const148 std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
149 return std::make_shared<test_type_worker>(state);
150 }
151
152 template<class T>
153 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
154
155 template<class T>
156 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
157 };
158
159 template<class T>
160 class mock_observer
161 : public rxt::detail::test_subject_base<T>
162 {
163 typedef typename rxn::notification<T> notification_type;
164 typedef rxn::recorded<typename notification_type::type> recorded_type;
165
166 public:
mock_observer(std::shared_ptr<test_type::test_type_state> sc)167 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
168 : sc(sc)
169 {
170 }
171
172 std::shared_ptr<test_type::test_type_state> sc;
173 std::vector<recorded_type> m;
174
on_subscribe(subscriber<T>) const175 virtual void on_subscribe(subscriber<T>) const {
176 std::terminate();
177 }
subscriptions() const178 virtual std::vector<rxn::subscription> subscriptions() const {
179 std::terminate();
180 }
181
messages() const182 virtual std::vector<recorded_type> messages() const {
183 return m;
184 }
185 };
186
187 template<class T>
make_subscriber() const188 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
189 {
190 typedef typename rxn::notification<T> notification_type;
191 typedef rxn::recorded<typename notification_type::type> recorded_type;
192
193 auto ts = std::make_shared<mock_observer<T>>(state);
194
195 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
196 // on_next
197 [ts](T value)
198 {
199 ts->m.push_back(
200 recorded_type(ts->sc->clock(), notification_type::on_next(value)));
201 },
202 // on_error
203 [ts](rxu::error_ptr e)
204 {
205 ts->m.push_back(
206 recorded_type(ts->sc->clock(), notification_type::on_error(e)));
207 },
208 // on_completed
209 [ts]()
210 {
211 ts->m.push_back(
212 recorded_type(ts->sc->clock(), notification_type::on_completed()));
213 })));
214 }
215
216 template<class T>
217 class cold_observable
218 : public rxt::detail::test_subject_base<T>
219 {
220 typedef cold_observable<T> this_type;
221 std::shared_ptr<test_type::test_type_state> sc;
222 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
223 mutable std::vector<recorded_type> mv;
224 mutable std::vector<rxn::subscription> sv;
225 mutable worker controller;
226
227 public:
228
cold_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,std::vector<recorded_type> mv)229 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
230 : sc(sc)
231 , mv(std::move(mv))
232 , controller(w)
233 {
234 }
235
236 template<class Iterator>
cold_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,Iterator begin,Iterator end)237 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
238 : sc(sc)
239 , mv(begin, end)
240 , controller(w)
241 {
242 }
243
on_subscribe(subscriber<T> o) const244 virtual void on_subscribe(subscriber<T> o) const {
245 sv.push_back(rxn::subscription(sc->clock()));
246 auto index = sv.size() - 1;
247
248 for (auto& message : mv) {
249 auto n = message.value();
250 sc->schedule_relative(message.time(), make_schedulable(
251 controller,
252 [n, o](const schedulable&) {
253 if (o.is_subscribed()) {
254 n->accept(o);
255 }
256 }));
257 }
258
259 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
260 o.add([sharedThis, index]() {
261 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
262 });
263 }
264
subscriptions() const265 virtual std::vector<rxn::subscription> subscriptions() const {
266 return sv;
267 }
268
messages() const269 virtual std::vector<recorded_type> messages() const {
270 return mv;
271 }
272 };
273
274 template<class T>
make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const275 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
276 {
277 auto co = std::make_shared<cold_observable<T>>(state, create_worker(composite_subscription()), std::move(messages));
278 return rxt::testable_observable<T>(co);
279 }
280
281 template<class T>
282 class hot_observable
283 : public rxt::detail::test_subject_base<T>
284 {
285 typedef hot_observable<T> this_type;
286 std::shared_ptr<test_type::test_type_state> sc;
287 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
288 typedef subscriber<T> observer_type;
289 mutable std::vector<recorded_type> mv;
290 mutable std::vector<rxn::subscription> sv;
291 mutable std::list<observer_type> observers;
292 mutable worker controller;
293
294 public:
295
hot_observable(std::shared_ptr<test_type::test_type_state> sc,worker w,std::vector<recorded_type> mv)296 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
297 : sc(sc)
298 , mv(mv)
299 , controller(w)
300 {
301 for (auto& message : mv) {
302 auto n = message.value();
303 sc->schedule_absolute(message.time(), make_schedulable(
304 controller,
305 [this, n](const schedulable&) {
306 auto local = this->observers;
307 for (auto& o : local) {
308 if (o.is_subscribed()) {
309 n->accept(o);
310 }
311 }
312 }));
313 }
314 }
315
~hot_observable()316 virtual ~hot_observable() {}
317
on_subscribe(observer_type o) const318 virtual void on_subscribe(observer_type o) const {
319 auto olocation = observers.insert(observers.end(), o);
320
321 sv.push_back(rxn::subscription(sc->clock()));
322 auto index = sv.size() - 1;
323
324 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
325 o.add([sharedThis, index, olocation]() {
326 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
327 sharedThis->observers.erase(olocation);
328 });
329 }
330
subscriptions() const331 virtual std::vector<rxn::subscription> subscriptions() const {
332 return sv;
333 }
334
messages() const335 virtual std::vector<recorded_type> messages() const {
336 return mv;
337 }
338 };
339
340 template<class T>
make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const341 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
342 {
343 auto worker = create_worker(composite_subscription());
344 auto shared = std::make_shared<hot_observable<T>>(state, worker, std::move(messages));
345 return rxt::testable_observable<T>(shared);
346 }
347
348 template<class F>
349 struct is_create_source_function
350 {
351 struct not_void {};
352 template<class CF>
353 static auto check(int) -> decltype((*(CF*)nullptr)());
354 template<class CF>
355 static not_void check(...);
356
357 static const bool value = is_observable<decltype(check<rxu::decay_t<F>>(0))>::value;
358 };
359
360 }
361
362 class test : public scheduler
363 {
364 std::shared_ptr<detail::test_type> tester;
365 public:
366
test(std::shared_ptr<detail::test_type> t)367 explicit test(std::shared_ptr<detail::test_type> t)
368 : scheduler(std::static_pointer_cast<scheduler_interface>(t))
369 , tester(t)
370 {
371 }
372
373 typedef detail::test_type::clock_type clock_type;
374
375 static const long created_time = 100;
376 static const long subscribed_time = 200;
377 static const long unsubscribed_time = 1000;
378
379 template<class T>
380 struct messages
381 {
382 typedef typename rxn::notification<T> notification_type;
383 typedef rxn::recorded<typename notification_type::type> recorded_type;
384 typedef rxn::subscription subscription_type;
385
messagesrxcpp::schedulers::test::messages386 messages() {}
387
388 template<typename U>
nextrxcpp::schedulers::test::messages389 static recorded_type next(long ticks, U value) {
390 return recorded_type(ticks, notification_type::on_next(std::move(value)));
391 }
392
completedrxcpp::schedulers::test::messages393 static recorded_type completed(long ticks) {
394 return recorded_type(ticks, notification_type::on_completed());
395 }
396
397 template<typename Exception>
errorrxcpp::schedulers::test::messages398 static recorded_type error(long ticks, Exception&& e) {
399 return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
400 }
401
subscriberxcpp::schedulers::test::messages402 static rxn::subscription subscribe(long subscribe, long unsubscribe) {
403 return rxn::subscription(subscribe, unsubscribe);
404 }
405 };
406
407 class test_worker : public worker
408 {
409 std::shared_ptr<detail::test_type::test_type_worker> tester;
410 public:
411
~test_worker()412 ~test_worker() {
413 }
414
test_worker(composite_subscription cs,std::shared_ptr<detail::test_type::test_type_worker> t)415 explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
416 : worker(cs, std::static_pointer_cast<worker_interface>(t))
417 , tester(t)
418 {
419 }
420
is_enabled() const421 bool is_enabled() const {return tester->is_enabled();}
clock() const422 long clock() const {return tester->clock();}
423
schedule_absolute(long when,const schedulable & a) const424 void schedule_absolute(long when, const schedulable& a) const {
425 tester->schedule_absolute(when, a);
426 }
427
schedule_relative(long when,const schedulable & a) const428 void schedule_relative(long when, const schedulable& a) const {
429 tester->schedule_relative(when, a);
430 }
431
432 template<class Arg0, class... ArgN>
schedule_absolute(long when,Arg0 && a0,ArgN &&...an) const433 auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
434 -> typename std::enable_if<
435 (detail::is_action_function<Arg0>::value ||
436 is_subscription<Arg0>::value) &&
437 !is_schedulable<Arg0>::value>::type {
438 tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
439 }
440
441 template<class Arg0, class... ArgN>
schedule_relative(long when,Arg0 && a0,ArgN &&...an) const442 auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
443 -> typename std::enable_if<
444 (detail::is_action_function<Arg0>::value ||
445 is_subscription<Arg0>::value) &&
446 !is_schedulable<Arg0>::value>::type {
447 tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
448 }
449
advance_to(long time) const450 void advance_to(long time) const
451 {
452 tester->advance_to(time);
453 }
454
advance_by(long time) const455 void advance_by(long time) const
456 {
457 tester->advance_by(time);
458 }
459
sleep(long time) const460 void sleep(long time) const
461 {
462 tester->sleep(time);
463 }
464
465 template<class T, class F>
start(F createSource,long created,long subscribed,long unsubscribed) const466 auto start(F createSource, long created, long subscribed, long unsubscribed) const
467 -> subscriber<T, rxt::testable_observer<T>>
468 {
469 struct state_type
470 : public std::enable_shared_from_this<state_type>
471 {
472 typedef decltype(createSource()) source_type;
473
474 std::unique_ptr<source_type> source;
475 subscriber<T, rxt::testable_observer<T>> o;
476
477 explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
478 : source()
479 , o(o)
480 {
481 }
482 };
483 auto state = std::make_shared<state_type>(this->make_subscriber<T>());
484
485 schedule_absolute(created, [createSource, state](const schedulable&) {
486 state->source.reset(new typename state_type::source_type(createSource()));
487 });
488 schedule_absolute(subscribed, [state](const schedulable&) {
489 state->source->subscribe(state->o);
490 });
491 schedule_absolute(unsubscribed, [state](const schedulable&) {
492 state->o.unsubscribe();
493 });
494
495 tester->start();
496
497 return state->o;
498 }
499
500 template<class T, class F>
start(F && createSource,long unsubscribed) const501 auto start(F&& createSource, long unsubscribed) const
502 -> subscriber<T, rxt::testable_observer<T>>
503 {
504 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
505 }
506
507 template<class T, class F>
start(F && createSource) const508 auto start(F&& createSource) const
509 -> subscriber<T, rxt::testable_observer<T>>
510 {
511 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
512 }
513
514 template<class F>
515 struct start_traits
516 {
517 typedef decltype((*(F*)nullptr)()) source_type;
518 typedef typename source_type::value_type value_type;
519 typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
520 };
521
522 template<class F>
start(F createSource,long created,long subscribed,long unsubscribed) const523 auto start(F createSource, long created, long subscribed, long unsubscribed) const
524 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
525 {
526 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created, subscribed, unsubscribed);
527 }
528
529 template<class F>
start(F createSource,long unsubscribed) const530 auto start(F createSource, long unsubscribed) const
531 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
532 {
533 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed);
534 }
535
536 template<class F>
start(F createSource) const537 auto start(F createSource) const
538 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
539 {
540 return start<rxu::value_type_t<start_traits<F>>>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
541 }
542
start() const543 void start() const {
544 tester->start();
545 }
546
547 template<class T>
make_subscriber() const548 subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
549 return tester->make_subscriber<T>();
550 }
551 };
552
now() const553 clock_type::time_point now() const {
554 return tester->now();
555 }
556
create_worker(composite_subscription cs=composite_subscription ()) const557 test_worker create_worker(composite_subscription cs = composite_subscription()) const {
558 return test_worker(cs, tester->create_test_type_worker_interface());
559 }
560
is_enabled() const561 bool is_enabled() const {return tester->is_enabled();}
clock() const562 long clock() const {return tester->clock();}
563
to_time_point(long absolute) const564 clock_type::time_point to_time_point(long absolute) const {
565 return tester->to_time_point(absolute);
566 }
567
568 template<class T>
make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const569 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
570 return tester->make_hot_observable(std::move(messages));
571 }
572
573 template<class T, std::size_t size>
make_hot_observable(const T (& arr)[size]) const574 auto make_hot_observable(const T (&arr) [size]) const
575 -> decltype(tester->make_hot_observable(std::vector<T>())) {
576 return tester->make_hot_observable(rxu::to_vector(arr));
577 }
578
579 template<class T>
make_hot_observable(std::initializer_list<T> il) const580 auto make_hot_observable(std::initializer_list<T> il) const
581 -> decltype(tester->make_hot_observable(std::vector<T>())) {
582 return tester->make_hot_observable(std::vector<T>(il));
583 }
584
585 template<class T>
make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const586 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
587 return tester->make_cold_observable(std::move(messages));
588 }
589
590 template<class T, std::size_t size>
make_cold_observable(const T (& arr)[size]) const591 auto make_cold_observable(const T (&arr) [size]) const
592 -> decltype(tester->make_cold_observable(std::vector<T>())) {
593 return tester->make_cold_observable(rxu::to_vector(arr));
594 }
595
596 template<class T>
make_cold_observable(std::initializer_list<T> il) const597 auto make_cold_observable(std::initializer_list<T> il) const
598 -> decltype(tester->make_cold_observable(std::vector<T>())) {
599 return tester->make_cold_observable(std::vector<T>(il));
600 }
601 };
602
603
make_test()604 inline test make_test() {
605 return test(std::make_shared<detail::test_type>());
606 }
607
608 }
609
identity_test()610 inline identity_one_worker identity_test() {
611 static identity_one_worker r(rxsc::make_test());
612 return r;
613 }
614
615 }
616
617 #endif
618