• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-map.hpp>
4 #include <rxcpp/operators/rx-subscribe_on.hpp>
5 #include <rxcpp/operators/rx-observe_on.hpp>
6 
7 #include <sstream>
8 
9 static const int static_subscriptions = 50000;
10 
11 SCENARIO("for loop subscribes to map with subscribe_on and observe_on", "[!hide][for][just][subscribe][subscribe_on][observe_on][long][perf]"){
12     const int& subscriptions = static_subscriptions;
13     GIVEN("a for loop"){
14         WHEN("subscribe 50K times"){
15             using namespace std::chrono;
16             typedef steady_clock clock;
17 
18             int runs = 10;
19 
20             for (;runs > 0; --runs) {
21 
22                 int c = 0;
23                 int n = 1;
24                 auto start = clock::now();
25                 for (int i = 0; i < subscriptions; ++i) {
26                     c += rx::observable<>::just(1)
__anonef2514930102(int i) 27                         .map([](int i) {
28                             std::stringstream serializer;
29                             serializer << i;
30                             return serializer.str();
31                         })
__anonef2514930202(const std::string& s) 32                         .map([](const std::string& s) {
33                             int i;
34                             std::stringstream(s) >> i;
35                             return i;
36                         })
37                         .subscribe_on(rx::observe_on_event_loop())
38                         .observe_on(rx::observe_on_event_loop())
39                         .as_blocking()
40                         .count();
41                 }
42                 auto finish = clock::now();
43                 auto msElapsed = duration_cast<milliseconds>(finish-start);
44                 REQUIRE(subscriptions == c);
45                 std::cout << "loop subscribe map subscribe_on observe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
46             }
47         }
48     }
49 }
50 
51 SCENARIO("for loop subscribes to map with subscribe_on", "[!hide][subscribe_on_only][for][just][subscribe][subscribe_on][long][perf]"){
52     const int& subscriptions = static_subscriptions;
53     GIVEN("a for loop"){
54         WHEN("subscribe 50K times"){
55             using namespace std::chrono;
56             typedef steady_clock clock;
57 
58             int runs = 10;
59 
60             for (;runs > 0; --runs) {
61 
62                 int c = 0;
63                 int n = 1;
64                 auto start = clock::now();
65 
66                 for (int i = 0; i < subscriptions; ++i) {
67                     c += rx::observable<>::
68                         just(1).
__anonef2514930302(int i) 69                         map([](int i) {
70                             std::stringstream serializer;
71                             serializer << i;
72                             return serializer.str();
73                         }).
__anonef2514930402(const std::string& s) 74                         map([](const std::string& s) {
75                             int i;
76                             std::stringstream(s) >> i;
77                             return i;
78                         }).
79                         subscribe_on(rx::observe_on_event_loop()).
80                         as_blocking().
81                         count();
82                 }
83                 auto finish = clock::now();
84                 auto msElapsed = duration_cast<milliseconds>(finish-start);
85                 REQUIRE(subscriptions == c);
86                 std::cout << "loop subscribe map subscribe_on : " << n << " subscribed, " << c << " on_next calls, " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
87             }
88         }
89     }
90 }
91 
92 SCENARIO("subscribe_on", "[subscribe][subscribe_on]"){
93     GIVEN("a source"){
94         auto sc = rxsc::make_test();
95         auto so = rx::synchronize_in_one_worker(sc);
96         auto w = sc.create_worker();
97         const rxsc::test::messages<int> on;
98 
99         auto xs = sc.make_hot_observable({
100             on.next(150, 1),
101             on.next(210, 2),
102             on.next(240, 3),
103             on.completed(300)
104         });
105 
106         WHEN("subscribe_on is specified"){
107 
108             auto res = w.start(
__anonef2514930502() 109                 [so, xs]() {
110                     return xs
111                          .subscribe_on(so);
112                 }
113             );
114 
115             THEN("the output contains items sent while subscribed"){
116                 auto required = rxu::to_vector({
117                     on.next(210, 2),
118                     on.next(240, 3),
119                     on.completed(300)
120                 });
121                 auto actual = res.get_observer().messages();
122                 REQUIRE(required == actual);
123             }
124 
125             THEN("there was 1 subscription/unsubscription to the source"){
126                 auto required = rxu::to_vector({
127                     on.subscribe(201, 300)
128                 });
129                 auto actual = xs.subscriptions();
130                 REQUIRE(required == actual);
131             }
132 
133         }
134     }
135 }
136 
137 SCENARIO("stream subscribe_on", "[subscribe][subscribe_on]"){
138     GIVEN("a source"){
139         auto sc = rxsc::make_test();
140         auto so = rx::synchronize_in_one_worker(sc);
141         auto w = sc.create_worker();
142         const rxsc::test::messages<int> on;
143 
144         auto xs = sc.make_hot_observable({
145             on.next(150, 1),
146             on.next(210, 2),
147             on.next(240, 3),
148             on.completed(300)
149         });
150 
151         WHEN("subscribe_on is specified"){
152 
153             auto res = w.start(
__anonef2514930602() 154                 [so, xs]() {
155                     return xs
156                          | rxo::subscribe_on(so);
157                 }
158             );
159 
160             THEN("the output contains items sent while subscribed"){
161                 auto required = rxu::to_vector({
162                     on.next(210, 2),
163                     on.next(240, 3),
164                     on.completed(300)
165                 });
166                 auto actual = res.get_observer().messages();
167                 REQUIRE(required == actual);
168             }
169 
170             THEN("there was 1 subscription/unsubscription to the source"){
171                 auto required = rxu::to_vector({
172                     on.subscribe(201, 300)
173                 });
174                 auto actual = xs.subscriptions();
175                 REQUIRE(required == actual);
176             }
177 
178         }
179     }
180 }
181