1 #include "../test.h" 2 3 SCENARIO("schedule_periodically", "[!hide][periodically][scheduler][long][perf][sources]"){ 4 GIVEN("schedule_periodically"){ 5 WHEN("the period is 1sec and the initial is 2sec"){ 6 using namespace std::chrono; 7 8 int c = 0; 9 auto sc = rxsc::make_current_thread(); 10 auto w = sc.create_worker(); 11 auto start = w.now() + seconds(2); 12 auto period = seconds(1); 13 w.schedule_periodically(start, period, __anon4b8fbe9d0102(rxsc::schedulable scbl)14 [=, &c](rxsc::schedulable scbl){ 15 auto nsDelta = duration_cast<milliseconds>(scbl.now() - (start + (period * c))); 16 ++c; 17 std::cout << "schedule_periodically : period " << c << ", " << nsDelta.count() << "ms delta from target time" << std::endl; 18 if (c == 5) {scbl.unsubscribe();} 19 }); 20 } 21 } 22 } 23 24 SCENARIO("schedule_periodically by duration", "[!hide][periodically][scheduler][long][perf][sources]"){ 25 GIVEN("schedule_periodically_duration"){ 26 WHEN("the period is 1sec and the initial is 2sec"){ 27 using namespace std::chrono; 28 typedef steady_clock clock; 29 30 int c = 0; 31 auto sc = rxsc::make_current_thread(); 32 auto w = sc.create_worker(); 33 34 auto schedule_periodically_duration = [w]( 35 rxsc::current_thread::clock_type::duration initial, 36 rxsc::current_thread::clock_type::duration period, __anon4b8fbe9d0202( rxsc::current_thread::clock_type::duration initial, rxsc::current_thread::clock_type::duration period, rxsc::schedulable activity)37 rxsc::schedulable activity){ 38 auto periodic = rxsc::make_schedulable( 39 activity, 40 [period, activity](rxsc::schedulable self) { 41 auto start = clock::now(); 42 // any recursion requests will be pushed to the scheduler queue 43 rxsc::recursion r(false); 44 // call action 45 activity(r.get_recurse()); 46 auto finish = clock::now(); 47 48 // schedule next occurance (if the action took longer than 'period' target will be in the past) 49 self.schedule(period - (finish - start)); 50 }); 51 w.schedule(initial, periodic); 52 }; 53 54 auto start = w.now() + seconds(2); 55 auto period = seconds(1); 56 schedule_periodically_duration(seconds(2), period, __anon4b8fbe9d0402(rxsc::schedulable scbl)57 rxsc::make_schedulable(w, [=, &c](rxsc::schedulable scbl){ 58 auto nsDelta = duration_cast<milliseconds>(scbl.now() - (start + (period * c))); 59 ++c; 60 std::cout << "schedule_periodically_duration : period " << c << ", " << nsDelta.count() << "ms delta from target time" << std::endl; 61 if (c == 5) {scbl.unsubscribe();} 62 })); 63 } 64 } 65 } 66 67 SCENARIO("intervals", "[!hide][periodically][interval][scheduler][long][perf][sources]"){ 68 GIVEN("10 intervals of 1 seconds"){ 69 WHEN("the period is 1sec and the initial is 2sec"){ 70 using namespace std::chrono; 71 72 int c = 0; 73 auto sc = rxsc::make_current_thread(); 74 auto so = rx::synchronize_in_one_worker(sc); 75 auto start = sc.now() + seconds(2); 76 auto period = seconds(1); 77 rx::composite_subscription cs; 78 rx::observable<>::interval(start, period, so) 79 .subscribe( 80 cs, __anon4b8fbe9d0502(long counter)81 [=, &c](long counter){ 82 auto nsDelta = duration_cast<milliseconds>(sc.now() - (start + (period * (counter - 1)))); 83 c = counter - 1; 84 std::cout << "interval : period " << counter << ", " << nsDelta.count() << "ms delta from target time" << std::endl; 85 if (counter == 5) {cs.unsubscribe();} 86 }, __anon4b8fbe9d0602(rxu::error_ptr)87 [](rxu::error_ptr){abort();}); 88 } 89 } 90 } 91