• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-replay.hpp>
3 
4 SCENARIO("replay basic", "[replay][multicast][subject][operators]"){
5     GIVEN("a test hot observable of ints"){
6         auto sc = rxsc::make_test();
7         auto w = sc.create_worker();
8         const rxsc::test::messages<int> on;
9 
10         auto xs = sc.make_hot_observable({
11             on.next(110, 0),
12             on.next(220, 1),
13             on.next(280, 2),
14             on.next(290, 3),
15             on.next(340, 4),
16             on.next(360, 5),
17             on.next(370, 6),
18             on.next(390, 7),
19             on.next(410, 8),
20             on.next(430, 9),
21             on.next(450, 10),
22             on.next(520, 11),
23             on.next(560, 12),
24             on.completed(600)
25         });
26 
27         auto res = w.make_subscriber<int>();
28 
29         rx::connectable_observable<int> ys;
30 
31         WHEN("subscribed and then connected"){
32 
33             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe00102(const rxsc::schedulable&)34                 [&ys, &xs](const rxsc::schedulable&){
35                     ys = xs.replay().as_dynamic();
36                 });
37 
38             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe00202(const rxsc::schedulable&)39                 [&ys, &res](const rxsc::schedulable&){
40                     ys.subscribe(res);
41                 });
42 
43             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe00302(const rxsc::schedulable&)44                 [&res](const rxsc::schedulable&){
45                     res.unsubscribe();
46                 });
47 
48             {
49                 rx::composite_subscription connection;
50 
51                 w.schedule_absolute(300,
__anon724c1fe00402(const rxsc::schedulable&)52                     [connection, &ys](const rxsc::schedulable&){
53                         ys.connect(connection);
54                     });
55                 w.schedule_absolute(400,
__anon724c1fe00502(const rxsc::schedulable&)56                     [connection](const rxsc::schedulable&){
57                         connection.unsubscribe();
58                     });
59             }
60 
61             {
62                 rx::composite_subscription connection;
63 
64                 w.schedule_absolute(500,
__anon724c1fe00602(const rxsc::schedulable&)65                     [connection, &ys](const rxsc::schedulable&){
66                         ys.connect(connection);
67                     });
68                 w.schedule_absolute(550,
__anon724c1fe00702(const rxsc::schedulable&)69                     [connection](const rxsc::schedulable&){
70                         connection.unsubscribe();
71                     });
72             }
73 
74             {
75                 rx::composite_subscription connection;
76 
77                 w.schedule_absolute(650,
__anon724c1fe00802(const rxsc::schedulable&)78                     [connection, &ys](const rxsc::schedulable&){
79                         ys.connect(connection);
80                     });
81                 w.schedule_absolute(800,
__anon724c1fe00902(const rxsc::schedulable&)82                     [connection](const rxsc::schedulable&){
83                         connection.unsubscribe();
84                     });
85             }
86 
87             w.start();
88 
89             THEN("the output only contains items sent while subscribed"){
90                 auto required = rxu::to_vector({
91                     on.next(340, 4),
92                     on.next(360, 5),
93                     on.next(370, 6),
94                     on.next(390, 7),
95                     on.next(520, 11)
96                 });
97                 auto actual = res.get_observer().messages();
98                 REQUIRE(required == actual);
99             }
100 
101             THEN("there were 3 subscription/unsubscription"){
102                 auto required = rxu::to_vector({
103                     on.subscribe(300, 400),
104                     on.subscribe(500, 550),
105                     on.subscribe(650, 800)
106                 });
107                 auto actual = xs.subscriptions();
108                 REQUIRE(required == actual);
109             }
110 
111         }
112     }
113 }
114 
115 SCENARIO("replay error", "[replay][error][multicast][subject][operators]"){
116     GIVEN("a test hot observable of ints"){
117         auto sc = rxsc::make_test();
118         auto w = sc.create_worker();
119         const rxsc::test::messages<int> on;
120 
121         std::runtime_error ex("publish on_error");
122 
123         auto xs = sc.make_hot_observable({
124             on.next(110, 0),
125             on.next(220, 1),
126             on.next(280, 2),
127             on.next(290, 3),
128             on.next(340, 4),
129             on.next(360, 5),
130             on.next(370, 6),
131             on.next(390, 7),
132             on.next(410, 8),
133             on.next(430, 9),
134             on.next(450, 10),
135             on.next(520, 11),
136             on.next(560, 12),
137             on.error(600, ex)
138         });
139 
140         auto res = w.make_subscriber<int>();
141 
142         rx::connectable_observable<int> ys;
143 
144         WHEN("subscribed and then connected"){
145 
146             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe00a02(const rxsc::schedulable&)147                 [&ys, &xs](const rxsc::schedulable&){
148                     ys = xs.replay().as_dynamic();
149                 });
150 
151             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe00b02(const rxsc::schedulable&)152                 [&ys, &res](const rxsc::schedulable&){
153                     ys.subscribe(res);
154                 });
155 
156             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe00c02(const rxsc::schedulable&)157                 [&res](const rxsc::schedulable&){
158                     res.unsubscribe();
159                 });
160 
161             {
162                 rx::composite_subscription connection;
163 
164                 w.schedule_absolute(300,
__anon724c1fe00d02(const rxsc::schedulable&)165                     [connection, &ys](const rxsc::schedulable&){
166                         ys.connect(connection);
167                     });
168                 w.schedule_absolute(400,
__anon724c1fe00e02(const rxsc::schedulable&)169                     [connection](const rxsc::schedulable&){
170                         connection.unsubscribe();
171                     });
172             }
173 
174             {
175                 rx::composite_subscription connection;
176 
177                 w.schedule_absolute(500,
__anon724c1fe00f02(const rxsc::schedulable&)178                     [connection, &ys](const rxsc::schedulable&){
179                         ys.connect(connection);
180                     });
181                 w.schedule_absolute(800,
__anon724c1fe01002(const rxsc::schedulable&)182                     [connection](const rxsc::schedulable&){
183                         connection.unsubscribe();
184                     });
185             }
186 
187             w.start();
188 
189             THEN("the output only contains items sent while subscribed"){
190                 auto required = rxu::to_vector({
191                     on.next(340, 4),
192                     on.next(360, 5),
193                     on.next(370, 6),
194                     on.next(390, 7),
195                     on.next(520, 11),
196                     on.next(560, 12),
197                     on.error(600, ex)
198                 });
199                 auto actual = res.get_observer().messages();
200                 REQUIRE(required == actual);
201             }
202 
203             THEN("there were 2 subscription/unsubscription"){
204                 auto required = rxu::to_vector({
205                     on.subscribe(300, 400),
206                     on.subscribe(500, 600)
207                 });
208                 auto actual = xs.subscriptions();
209                 REQUIRE(required == actual);
210             }
211 
212         }
213     }
214 }
215 
216 SCENARIO("replay multiple subscriptions", "[replay][multicast][subject][operators]"){
217     GIVEN("a test hot observable of ints"){
218         auto sc = rxsc::make_test();
219         auto w = sc.create_worker();
220         const rxsc::test::messages<int> on;
221 
222         auto xs = sc.make_hot_observable({
223             on.next(110, 0),
224             on.next(220, 1),
225             on.next(280, 2),
226             on.next(290, 3),
227             on.next(340, 4),
228             on.next(360, 5),
229             on.next(370, 6),
230             on.next(390, 7),
231             on.next(410, 8),
232             on.next(430, 9),
233             on.next(450, 10),
234             on.next(520, 11),
235             on.next(560, 12),
236             on.completed(650)
237         });
238 
239         rx::connectable_observable<int> ys;
240 
241         WHEN("subscribed and then connected"){
242 
243             // Create connectable observable
244             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe01102(const rxsc::schedulable&)245                 [&ys, &xs](const rxsc::schedulable&){
246                     ys = xs.replay().as_dynamic();
247                 });
248 
249             // Manage connection
250             rx::composite_subscription connection;
251             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe01202(const rxsc::schedulable&)252                 [connection, &ys](const rxsc::schedulable&){
253                     ys.connect(connection);
254                 });
255             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe01302(const rxsc::schedulable&)256                 [connection](const rxsc::schedulable&){
257                     connection.unsubscribe();
258                 });
259 
260             // Subscribe before the first item emitted
261             auto res1 = w.make_subscriber<int>();
__anon724c1fe01402(const rxsc::schedulable&)262             w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
263 
264             // Subscribe in the middle of emitting
265             auto res2 = w.make_subscriber<int>();
__anon724c1fe01502(const rxsc::schedulable&)266             w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
267 
268             // Subscribe after the last item emitted
269             auto res3 = w.make_subscriber<int>();
__anon724c1fe01602(const rxsc::schedulable&)270             w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
271 
272             w.start();
273 
274             THEN("the output only contains items sent while subscribed"){
275                 auto required = rxu::to_vector({
276                     on.next(220, 1),
277                     on.next(280, 2),
278                     on.next(290, 3),
279                     on.next(340, 4),
280                     on.next(360, 5),
281                     on.next(370, 6),
282                     on.next(390, 7),
283                     on.next(410, 8),
284                     on.next(430, 9),
285                     on.next(450, 10),
286                     on.next(520, 11),
287                     on.next(560, 12),
288                     on.completed(650)
289                 });
290                 auto actual = res1.get_observer().messages();
291                 REQUIRE(required == actual);
292             }
293 
294             THEN("the output only contains items sent while subscribed"){
295                 auto required = rxu::to_vector({
296                     on.next(400, 1),
297                     on.next(400, 2),
298                     on.next(400, 3),
299                     on.next(400, 4),
300                     on.next(400, 5),
301                     on.next(400, 6),
302                     on.next(400, 7),
303                     on.next(410, 8),
304                     on.next(430, 9),
305                     on.next(450, 10),
306                     on.next(520, 11),
307                     on.next(560, 12),
308                     on.completed(650)
309                 });
310                 auto actual = res2.get_observer().messages();
311                 REQUIRE(required == actual);
312             }
313 
314             THEN("the output only contains items sent while subscribed"){
315                 auto required = rxu::to_vector({
316                     on.next(600, 1),
317                     on.next(600, 2),
318                     on.next(600, 3),
319                     on.next(600, 4),
320                     on.next(600, 5),
321                     on.next(600, 6),
322                     on.next(600, 7),
323                     on.next(600, 8),
324                     on.next(600, 9),
325                     on.next(600, 10),
326                     on.next(600, 11),
327                     on.next(600, 12),
328                     on.completed(650)
329                 });
330                 auto actual = res3.get_observer().messages();
331                 REQUIRE(required == actual);
332             }
333 
334             THEN("there was 1 subscription/unsubscription"){
335                 auto required = rxu::to_vector({
336                     on.subscribe(200, 650)
337                 });
338                 auto actual = xs.subscriptions();
339                 REQUIRE(required == actual);
340             }
341 
342         }
343     }
344 }
345 
346 SCENARIO("replay multiple subscriptions with count", "[replay][multicast][subject][operators]"){
347     GIVEN("a test hot observable of ints"){
348         auto sc = rxsc::make_test();
349         auto w = sc.create_worker();
350         const rxsc::test::messages<int> on;
351 
352         auto xs = sc.make_hot_observable({
353             on.next(110, 0),
354             on.next(220, 1),
355             on.next(280, 2),
356             on.next(290, 3),
357             on.next(340, 4),
358             on.next(360, 5),
359             on.next(370, 6),
360             on.next(390, 7),
361             on.next(410, 8),
362             on.next(430, 9),
363             on.next(450, 10),
364             on.next(520, 11),
365             on.next(560, 12),
366             on.completed(650)
367         });
368 
369         rx::connectable_observable<int> ys;
370 
371         WHEN("subscribed and then connected"){
372 
373             // Create connectable observable
374             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe01702(const rxsc::schedulable&)375                 [&ys, &xs](const rxsc::schedulable&){
376                     ys = xs.replay(3).as_dynamic();
377                 });
378 
379             // Manage connection
380             rx::composite_subscription connection;
381             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe01802(const rxsc::schedulable&)382                 [connection, &ys](const rxsc::schedulable&){
383                     ys.connect(connection);
384                 });
385             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe01902(const rxsc::schedulable&)386                 [connection](const rxsc::schedulable&){
387                     connection.unsubscribe();
388                 });
389 
390             // Subscribe before the first item emitted
391             auto res1 = w.make_subscriber<int>();
__anon724c1fe01a02(const rxsc::schedulable&)392             w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
393 
394             // Subscribe in the middle of emitting
395             auto res2 = w.make_subscriber<int>();
__anon724c1fe01b02(const rxsc::schedulable&)396             w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
397 
398             // Subscribe after the last item emitted
399             auto res3 = w.make_subscriber<int>();
__anon724c1fe01c02(const rxsc::schedulable&)400             w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
401 
402             w.start();
403 
404             THEN("the output only contains items sent while subscribed"){
405                 auto required = rxu::to_vector({
406                     on.next(220, 1),
407                     on.next(280, 2),
408                     on.next(290, 3),
409                     on.next(340, 4),
410                     on.next(360, 5),
411                     on.next(370, 6),
412                     on.next(390, 7),
413                     on.next(410, 8),
414                     on.next(430, 9),
415                     on.next(450, 10),
416                     on.next(520, 11),
417                     on.next(560, 12),
418                     on.completed(650)
419                 });
420                 auto actual = res1.get_observer().messages();
421                 REQUIRE(required == actual);
422             }
423 
424             THEN("the output only contains items sent while subscribed"){
425                 auto required = rxu::to_vector({
426                     on.next(400, 5),
427                     on.next(400, 6),
428                     on.next(400, 7),
429                     on.next(410, 8),
430                     on.next(430, 9),
431                     on.next(450, 10),
432                     on.next(520, 11),
433                     on.next(560, 12),
434                     on.completed(650)
435                 });
436                 auto actual = res2.get_observer().messages();
437                 REQUIRE(required == actual);
438             }
439 
440             THEN("the output only contains items sent while subscribed"){
441                 auto required = rxu::to_vector({
442                     on.next(600, 10),
443                     on.next(600, 11),
444                     on.next(600, 12),
445                     on.completed(650)
446                 });
447                 auto actual = res3.get_observer().messages();
448                 REQUIRE(required == actual);
449             }
450 
451             THEN("there was 1 subscription/unsubscription"){
452                 auto required = rxu::to_vector({
453                     on.subscribe(200, 650)
454                 });
455                 auto actual = xs.subscriptions();
456                 REQUIRE(required == actual);
457             }
458 
459         }
460     }
461 }
462 
463 SCENARIO("replay multiple subscriptions with time", "[replay][multicast][subject][operators]"){
464     GIVEN("a test hot observable of ints"){
465         auto sc = rxsc::make_test();
466         auto w = sc.create_worker();
467         auto so = rx::identity_one_worker(sc);
468         const rxsc::test::messages<int> on;
469 
470         auto xs = sc.make_hot_observable({
471             on.next(110, 0),
472             on.next(220, 1),
473             on.next(240, 2),
474             on.next(260, 3),
475             on.next(340, 4),
476             on.next(360, 5),
477             on.next(370, 6),
478             on.next(390, 7),
479             on.next(410, 8),
480             on.next(430, 9),
481             on.next(450, 10),
482             on.next(520, 11),
483             on.next(560, 12),
484             on.completed(650)
485         });
486 
487         rx::connectable_observable<int> ys;
488 
489         WHEN("subscribed and then connected"){
490             using namespace std::chrono;
491 
492             // Create connectable observable
493             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe01d02(const rxsc::schedulable&)494                 [&](const rxsc::schedulable&){
495                     ys = xs.replay(milliseconds(100), so).as_dynamic();
496                 });
497 
498             // Manage connection
499             rx::composite_subscription connection;
500             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe01e02(const rxsc::schedulable&)501                 [connection, &ys](const rxsc::schedulable&){
502                     ys.connect(connection);
503                 });
504             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe01f02(const rxsc::schedulable&)505                 [connection](const rxsc::schedulable&){
506                     connection.unsubscribe();
507                 });
508 
509             // Subscribe before the first item emitted
510             auto res1 = w.make_subscriber<int>();
__anon724c1fe02002(const rxsc::schedulable&)511             w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
512 
513             // Subscribe in the middle of emitting
514             auto res2 = w.make_subscriber<int>();
__anon724c1fe02102(const rxsc::schedulable&)515             w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
516 
517             // Subscribe after the last item emitted
518             auto res3 = w.make_subscriber<int>();
__anon724c1fe02202(const rxsc::schedulable&)519             w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
520 
521             w.start();
522 
523             THEN("the output only contains items sent while subscribed"){
524                 auto required = rxu::to_vector({
525                     on.next(220, 1),
526                     on.next(240, 2),
527                     on.next(260, 3),
528                     on.next(340, 4),
529                     on.next(360, 5),
530                     on.next(370, 6),
531                     on.next(390, 7),
532                     on.next(410, 8),
533                     on.next(430, 9),
534                     on.next(450, 10),
535                     on.next(520, 11),
536                     on.next(560, 12),
537                     on.completed(650)
538                 });
539                 auto actual = res1.get_observer().messages();
540                 REQUIRE(required == actual);
541             }
542 
543             THEN("the output only contains items sent while subscribed"){
544                 auto required = rxu::to_vector({
545                     on.next(400, 4),
546                     on.next(400, 5),
547                     on.next(400, 6),
548                     on.next(400, 7),
549                     on.next(410, 8),
550                     on.next(430, 9),
551                     on.next(450, 10),
552                     on.next(520, 11),
553                     on.next(560, 12),
554                     on.completed(650)
555                 });
556                 auto actual = res2.get_observer().messages();
557                 REQUIRE(required == actual);
558             }
559 
560             THEN("the output only contains items sent while subscribed"){
561                 auto required = rxu::to_vector({
562                     on.next(600, 11),
563                     on.next(600, 12),
564                     on.completed(650)
565                 });
566                 auto actual = res3.get_observer().messages();
567                 REQUIRE(required == actual);
568             }
569 
570             THEN("there was 1 subscription/unsubscription"){
571                 auto required = rxu::to_vector({
572                     on.subscribe(200, 650)
573                 });
574                 auto actual = xs.subscriptions();
575                 REQUIRE(required == actual);
576             }
577 
578         }
579     }
580 }
581 
582 SCENARIO("replay multiple subscriptions with count and time", "[replay][multicast][subject][operators]"){
583     GIVEN("a test hot observable of ints"){
584         auto sc = rxsc::make_test();
585         auto w = sc.create_worker();
586         auto so = rx::identity_one_worker(sc);
587         const rxsc::test::messages<int> on;
588 
589         auto xs = sc.make_hot_observable({
590             on.next(110, 0),
591             on.next(220, 1),
592             on.next(240, 2),
593             on.next(260, 3),
594             on.next(340, 4),
595             on.next(360, 5),
596             on.next(370, 6),
597             on.next(390, 7),
598             on.next(410, 8),
599             on.next(430, 9),
600             on.next(450, 10),
601             on.next(520, 11),
602             on.next(560, 12),
603             on.completed(650)
604         });
605 
606         rx::connectable_observable<int> ys;
607 
608         WHEN("subscribed and then connected"){
609             using namespace std::chrono;
610 
611             // Create connectable observable
612             w.schedule_absolute(rxsc::test::created_time,
__anon724c1fe02302(const rxsc::schedulable&)613                 [&](const rxsc::schedulable&){
614                     ys = xs.replay(3, milliseconds(100), so).as_dynamic();
615                 });
616 
617             // Manage connection
618             rx::composite_subscription connection;
619             w.schedule_absolute(rxsc::test::subscribed_time,
__anon724c1fe02402(const rxsc::schedulable&)620                 [connection, &ys](const rxsc::schedulable&){
621                     ys.connect(connection);
622                 });
623             w.schedule_absolute(rxsc::test::unsubscribed_time,
__anon724c1fe02502(const rxsc::schedulable&)624                 [connection](const rxsc::schedulable&){
625                     connection.unsubscribe();
626                 });
627 
628             // Subscribe before the first item emitted
629             auto res1 = w.make_subscriber<int>();
__anon724c1fe02602(const rxsc::schedulable&)630             w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
631 
632             // Subscribe in the middle of emitting
633             auto res2 = w.make_subscriber<int>();
__anon724c1fe02702(const rxsc::schedulable&)634             w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
635 
636             // Subscribe after the last item emitted
637             auto res3 = w.make_subscriber<int>();
__anon724c1fe02802(const rxsc::schedulable&)638             w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
639 
640             w.start();
641 
642             THEN("the output only contains items sent while subscribed"){
643                 auto required = rxu::to_vector({
644                     on.next(220, 1),
645                     on.next(240, 2),
646                     on.next(260, 3),
647                     on.next(340, 4),
648                     on.next(360, 5),
649                     on.next(370, 6),
650                     on.next(390, 7),
651                     on.next(410, 8),
652                     on.next(430, 9),
653                     on.next(450, 10),
654                     on.next(520, 11),
655                     on.next(560, 12),
656                     on.completed(650)
657                 });
658                 auto actual = res1.get_observer().messages();
659                 REQUIRE(required == actual);
660             }
661 
662             THEN("the output only contains items sent while subscribed"){
663                 auto required = rxu::to_vector({
664                     on.next(400, 5),
665                     on.next(400, 6),
666                     on.next(400, 7),
667                     on.next(410, 8),
668                     on.next(430, 9),
669                     on.next(450, 10),
670                     on.next(520, 11),
671                     on.next(560, 12),
672                     on.completed(650)
673                 });
674                 auto actual = res2.get_observer().messages();
675                 REQUIRE(required == actual);
676             }
677 
678             THEN("the output only contains items sent while subscribed"){
679                 auto required = rxu::to_vector({
680                     on.next(600, 11),
681                     on.next(600, 12),
682                     on.completed(650)
683                 });
684                 auto actual = res3.get_observer().messages();
685                 REQUIRE(required == actual);
686             }
687 
688             THEN("there was 1 subscription/unsubscription"){
689                 auto required = rxu::to_vector({
690                     on.subscribe(200, 650)
691                 });
692                 auto actual = xs.subscriptions();
693                 REQUIRE(required == actual);
694             }
695 
696         }
697     }
698 }
699