• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 
3 SCENARIO("schedule_periodically", "[!hide][periodically][scheduler][long][perf][sources]"){
4     GIVEN("schedule_periodically"){
5         WHEN("the period is 1sec and the initial is 2sec"){
6             using namespace std::chrono;
7 
8             int c = 0;
9             auto sc = rxsc::make_current_thread();
10             auto w = sc.create_worker();
11             auto start = w.now() + seconds(2);
12             auto period = seconds(1);
13             w.schedule_periodically(start, period,
__anon4b8fbe9d0102(rxsc::schedulable scbl)14                 [=, &c](rxsc::schedulable scbl){
15                     auto nsDelta = duration_cast<milliseconds>(scbl.now() - (start + (period * c)));
16                     ++c;
17                     std::cout << "schedule_periodically          : period " << c << ", " << nsDelta.count() << "ms delta from target time" << std::endl;
18                     if (c == 5) {scbl.unsubscribe();}
19                 });
20         }
21     }
22 }
23 
24 SCENARIO("schedule_periodically by duration", "[!hide][periodically][scheduler][long][perf][sources]"){
25     GIVEN("schedule_periodically_duration"){
26         WHEN("the period is 1sec and the initial is 2sec"){
27             using namespace std::chrono;
28             typedef steady_clock clock;
29 
30             int c = 0;
31             auto sc = rxsc::make_current_thread();
32             auto w = sc.create_worker();
33 
34             auto schedule_periodically_duration = [w](
35                     rxsc::current_thread::clock_type::duration initial,
36                     rxsc::current_thread::clock_type::duration period,
__anon4b8fbe9d0202( rxsc::current_thread::clock_type::duration initial, rxsc::current_thread::clock_type::duration period, rxsc::schedulable activity)37                     rxsc::schedulable activity){
38                 auto periodic = rxsc::make_schedulable(
39                     activity,
40                     [period, activity](rxsc::schedulable self) {
41                         auto start = clock::now();
42                         // any recursion requests will be pushed to the scheduler queue
43                         rxsc::recursion r(false);
44                         // call action
45                         activity(r.get_recurse());
46                         auto finish = clock::now();
47 
48                         // schedule next occurance (if the action took longer than 'period' target will be in the past)
49                         self.schedule(period - (finish - start));
50                     });
51                 w.schedule(initial, periodic);
52             };
53 
54             auto start = w.now() + seconds(2);
55             auto period = seconds(1);
56             schedule_periodically_duration(seconds(2), period,
__anon4b8fbe9d0402(rxsc::schedulable scbl)57                 rxsc::make_schedulable(w, [=, &c](rxsc::schedulable scbl){
58                     auto nsDelta = duration_cast<milliseconds>(scbl.now() - (start + (period * c)));
59                     ++c;
60                     std::cout << "schedule_periodically_duration : period " << c << ", " << nsDelta.count() << "ms delta from target time" << std::endl;
61                     if (c == 5) {scbl.unsubscribe();}
62                 }));
63         }
64     }
65 }
66 
67 SCENARIO("intervals", "[!hide][periodically][interval][scheduler][long][perf][sources]"){
68     GIVEN("10 intervals of 1 seconds"){
69         WHEN("the period is 1sec and the initial is 2sec"){
70             using namespace std::chrono;
71 
72             int c = 0;
73             auto sc = rxsc::make_current_thread();
74             auto so = rx::synchronize_in_one_worker(sc);
75             auto start = sc.now() + seconds(2);
76             auto period = seconds(1);
77             rx::composite_subscription cs;
78             rx::observable<>::interval(start, period, so)
79                 .subscribe(
80                     cs,
__anon4b8fbe9d0502(long counter)81                     [=, &c](long counter){
82                         auto nsDelta = duration_cast<milliseconds>(sc.now() - (start + (period * (counter - 1))));
83                         c = counter - 1;
84                         std::cout << "interval          : period " << counter << ", " << nsDelta.count() << "ms delta from target time" << std::endl;
85                         if (counter == 5) {cs.unsubscribe();}
86                     },
__anon4b8fbe9d0602(rxu::error_ptr)87                     [](rxu::error_ptr){abort();});
88         }
89     }
90 }
91