• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-concat.hpp>
3 #include <rxcpp/operators/rx-buffer_count.hpp>
4 #include <rxcpp/operators/rx-buffer_time.hpp>
5 #include <rxcpp/operators/rx-buffer_time_count.hpp>
6 #include <rxcpp/operators/rx-take.hpp>
7 
8 SCENARIO("buffer count partial window", "[buffer][operators]"){
9     GIVEN("1 hot observable of ints."){
10         auto sc = rxsc::make_test();
11         auto w = sc.create_worker();
12         const rxsc::test::messages<int> on;
13         const rxsc::test::messages<std::vector<int>> v_on;
14 
15         auto xs = sc.make_hot_observable({
16             on.next(150, 1),
17             on.next(210, 2),
18             on.next(220, 3),
19             on.next(230, 4),
20             on.next(240, 5),
21             on.completed(250)
22         });
23 
24         WHEN("group each int with the next 4 ints"){
25 
26             auto res = w.start(
__anon79d0578d0102() 27                 [&]() {
28                     return xs
29                         | rxo::buffer(5)
30                         // forget type to workaround lambda deduction bug on msvc 2013
31                         | rxo::as_dynamic();
32                 }
33             );
34 
35             THEN("the output contains groups of ints"){
36                 auto required = rxu::to_vector({
37                     v_on.next(250, rxu::to_vector({ 2, 3, 4, 5 })),
38                     v_on.completed(250)
39                 });
40                 auto actual = res.get_observer().messages();
41                 REQUIRE(required == actual);
42             }
43 
44             THEN("there was one subscription and one unsubscription to the xs"){
45                 auto required = rxu::to_vector({
46                     on.subscribe(200, 250)
47                 });
48                 auto actual = xs.subscriptions();
49                 REQUIRE(required == actual);
50             }
51         }
52     }
53 }
54 
55 SCENARIO("buffer count full windows", "[buffer][operators]"){
56     GIVEN("1 hot observable of ints."){
57         auto sc = rxsc::make_test();
58         auto w = sc.create_worker();
59         const rxsc::test::messages<int> on;
60         const rxsc::test::messages<std::vector<int>> v_on;
61 
62         auto xs = sc.make_hot_observable({
63             on.next(150, 1),
64             on.next(210, 2),
65             on.next(220, 3),
66             on.next(230, 4),
67             on.next(240, 5),
68             on.completed(250)
69         });
70 
71         WHEN("group each int with the next int"){
72 
73             auto res = w.start(
__anon79d0578d0202() 74                 [&]() {
75                 return xs
76                     .buffer(2)
77                     // forget type to workaround lambda deduction bug on msvc 2013
78                     .as_dynamic();
79             }
80             );
81 
82             THEN("the output contains groups of ints"){
83                 auto required = rxu::to_vector({
84                     v_on.next(220, rxu::to_vector({ 2, 3 })),
85                     v_on.next(240, rxu::to_vector({ 4, 5 })),
86                     v_on.completed(250)
87                 });
88                 auto actual = res.get_observer().messages();
89                 REQUIRE(required == actual);
90             }
91 
92             THEN("there was one subscription and one unsubscription to the xs"){
93                 auto required = rxu::to_vector({
94                     on.subscribe(200, 250)
95                 });
96                 auto actual = xs.subscriptions();
97                 REQUIRE(required == actual);
98             }
99         }
100     }
101 }
102 
103 SCENARIO("buffer count full and partial windows", "[buffer][operators]"){
104     GIVEN("1 hot observable of ints."){
105         auto sc = rxsc::make_test();
106         auto w = sc.create_worker();
107         const rxsc::test::messages<int> on;
108         const rxsc::test::messages<std::vector<int>> v_on;
109 
110         auto xs = sc.make_hot_observable({
111             on.next(150, 1),
112             on.next(210, 2),
113             on.next(220, 3),
114             on.next(230, 4),
115             on.next(240, 5),
116             on.completed(250)
117         });
118 
119         WHEN("group each int with the next 2 ints"){
120 
121             auto res = w.start(
__anon79d0578d0302() 122                 [&]() {
123                     return xs
124                         .buffer(3)
125                         // forget type to workaround lambda deduction bug on msvc 2013
126                         .as_dynamic();
127                 }
128             );
129 
130             THEN("the output contains groups of ints"){
131                 auto required = rxu::to_vector({
132                     v_on.next(230, rxu::to_vector({ 2, 3, 4 })),
133                     v_on.next(250, rxu::to_vector({ 5 })),
134                     v_on.completed(250)
135                 });
136                 auto actual = res.get_observer().messages();
137                 REQUIRE(required == actual);
138             }
139 
140             THEN("there was one subscription and one unsubscription to the xs"){
141                 auto required = rxu::to_vector({
142                     on.subscribe(200, 250)
143                 });
144                 auto actual = xs.subscriptions();
145                 REQUIRE(required == actual);
146             }
147         }
148     }
149 }
150 
151 SCENARIO("buffer count error", "[buffer][operators]"){
152     GIVEN("1 hot observable of ints."){
153         auto sc = rxsc::make_test();
154         auto w = sc.create_worker();
155         const rxsc::test::messages<int> on;
156         const rxsc::test::messages<std::vector<int>> v_on;
157 
158         std::runtime_error ex("buffer on_error from source");
159 
160         auto xs = sc.make_hot_observable({
161             on.next(150, 1),
162             on.next(210, 2),
163             on.next(220, 3),
164             on.next(230, 4),
165             on.next(240, 5),
166             on.error(250, ex)
167         });
168 
169         WHEN("group each int with the next 4 ints"){
170 
171             auto res = w.start(
__anon79d0578d0402() 172                 [&]() {
173                     return xs
174                         .buffer(5)
175                         // forget type to workaround lambda deduction bug on msvc 2013
176                         .as_dynamic();
177                 }
178             );
179 
180             THEN("the output contains groups of ints"){
181                 auto required = rxu::to_vector({
182                     v_on.error(250, ex)
183                 });
184                 auto actual = res.get_observer().messages();
185                 REQUIRE(required == actual);
186             }
187 
188             THEN("there was one subscription and one unsubscription to the xs"){
189                 auto required = rxu::to_vector({
190                     on.subscribe(200, 250)
191                 });
192                 auto actual = xs.subscriptions();
193                 REQUIRE(required == actual);
194             }
195         }
196     }
197 }
198 
199 SCENARIO("buffer count skip less", "[buffer][operators]"){
200     GIVEN("1 hot observable of ints."){
201         auto sc = rxsc::make_test();
202         auto w = sc.create_worker();
203         const rxsc::test::messages<int> on;
204         const rxsc::test::messages<std::vector<int>> v_on;
205 
206         auto xs = sc.make_hot_observable({
207             on.next(150, 1),
208             on.next(210, 2),
209             on.next(220, 3),
210             on.next(230, 4),
211             on.next(240, 5),
212             on.completed(250)
213         });
214 
215         WHEN("group each int with the next 2 ints"){
216 
217             auto res = w.start(
__anon79d0578d0502() 218                 [&]() {
219                     return xs
220                         .buffer(3, 1)
221                         // forget type to workaround lambda deduction bug on msvc 2013
222                         .as_dynamic();
223                 }
224             );
225 
226             THEN("the output contains groups of ints"){
227                 auto required = rxu::to_vector({
228                     v_on.next(230, rxu::to_vector({ 2, 3, 4 })),
229                     v_on.next(240, rxu::to_vector({ 3, 4, 5 })),
230                     v_on.next(250, rxu::to_vector({ 4, 5 })),
231                     v_on.next(250, rxu::to_vector({ 5 })),
232                     v_on.completed(250)
233                 });
234                 auto actual = res.get_observer().messages();
235                 REQUIRE(required == actual);
236             }
237 
238             THEN("there was one subscription and one unsubscription to the xs"){
239                 auto required = rxu::to_vector({
240                     on.subscribe(200, 250)
241                 });
242                 auto actual = xs.subscriptions();
243                 REQUIRE(required == actual);
244             }
245         }
246     }
247 }
248 
249 SCENARIO("buffer count skip more", "[buffer][operators]"){
250     GIVEN("1 hot observable of ints."){
251         auto sc = rxsc::make_test();
252         auto w = sc.create_worker();
253         const rxsc::test::messages<int> on;
254         const rxsc::test::messages<std::vector<int>> v_on;
255 
256         auto xs = sc.make_hot_observable({
257             on.next(150, 1),
258             on.next(210, 2),
259             on.next(220, 3),
260             on.next(230, 4),
261             on.next(240, 5),
262             on.completed(250)
263         });
264 
265         WHEN("group each int with the next int skipping the third one"){
266 
267             auto res = w.start(
__anon79d0578d0602() 268                 [&]() {
269                 return xs
270                     .buffer(2, 3)
271                     // forget type to workaround lambda deduction bug on msvc 2013
272                     .as_dynamic();
273             }
274             );
275 
276             THEN("the output contains groups of ints"){
277                 auto required = rxu::to_vector({
278                     v_on.next(220, rxu::to_vector({ 2, 3 })),
279                     v_on.next(250, rxu::to_vector({ 5 })),
280                     v_on.completed(250)
281                 });
282                 auto actual = res.get_observer().messages();
283                 REQUIRE(required == actual);
284             }
285 
286             THEN("there was one subscription and one unsubscription to the xs"){
287                 auto required = rxu::to_vector({
288                     on.subscribe(200, 250)
289                 });
290                 auto actual = xs.subscriptions();
291                 REQUIRE(required == actual);
292             }
293         }
294     }
295 }
296 
297 SCENARIO("buffer count basic", "[buffer][operators]"){
298     GIVEN("1 hot observable of ints."){
299         auto sc = rxsc::make_test();
300         auto w = sc.create_worker();
301         const rxsc::test::messages<int> on;
302         const rxsc::test::messages<std::vector<int>> v_on;
303 
304         auto xs = sc.make_hot_observable({
305             on.next(100, 1),
306             on.next(210, 2),
307             on.next(240, 3),
308             on.next(280, 4),
309             on.next(320, 5),
310             on.next(350, 6),
311             on.next(380, 7),
312             on.next(420, 8),
313             on.next(470, 9),
314             on.completed(600)
315         });
316 
317         WHEN("group each int with the next 2 ints"){
318 
319             auto res = w.start(
__anon79d0578d0702() 320                 [&]() {
321                     return xs
322                         .buffer(3, 2)
323                         // forget type to workaround lambda deduction bug on msvc 2013
324                         .as_dynamic();
325                 }
326             );
327 
328             THEN("the output contains groups of ints"){
329                 auto required = rxu::to_vector({
330                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
331                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
332                     v_on.next(420, rxu::to_vector({ 6, 7, 8 })),
333                     v_on.next(600, rxu::to_vector({ 8, 9 })),
334                     v_on.completed(600)
335                 });
336                 auto actual = res.get_observer().messages();
337                 REQUIRE(required == actual);
338             }
339 
340             THEN("there was one subscription and one unsubscription to the xs"){
341                 auto required = rxu::to_vector({
342                     on.subscribe(200, 600)
343                 });
344                 auto actual = xs.subscriptions();
345                 REQUIRE(required == actual);
346             }
347         }
348     }
349 }
350 
351 SCENARIO("buffer count disposed", "[buffer][operators]"){
352     GIVEN("1 hot observable of ints."){
353         auto sc = rxsc::make_test();
354         auto w = sc.create_worker();
355         const rxsc::test::messages<int> on;
356         const rxsc::test::messages<std::vector<int>> v_on;
357 
358         auto xs = sc.make_hot_observable({
359             on.next(100, 1),
360             on.next(210, 2),
361             on.next(240, 3),
362             on.next(280, 4),
363             on.next(320, 5),
364             on.next(350, 6),
365             on.next(380, 7),
366             on.next(420, 8),
367             on.next(470, 9),
368             on.completed(600)
369         });
370 
371         WHEN("group each int with the next 2 ints"){
372 
373             auto res = w.start(
__anon79d0578d0802() 374                 [&]() {
375                     return xs
376                         .buffer(3, 2)
377                         // forget type to workaround lambda deduction bug on msvc 2013
378                         .as_dynamic();
379                 },
380                 370
381             );
382 
383             THEN("the output contains groups of ints"){
384                 auto required = rxu::to_vector({
385                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
386                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
387                 });
388                 auto actual = res.get_observer().messages();
389                 REQUIRE(required == actual);
390             }
391 
392             THEN("there was one subscription and one unsubscription to the xs"){
393                 auto required = rxu::to_vector({
394                     on.subscribe(200, 370)
395                 });
396                 auto actual = xs.subscriptions();
397                 REQUIRE(required == actual);
398             }
399         }
400     }
401 }
402 
403 SCENARIO("buffer count error 2", "[buffer][operators]"){
404     GIVEN("1 hot observable of ints."){
405         auto sc = rxsc::make_test();
406         auto w = sc.create_worker();
407         const rxsc::test::messages<int> on;
408         const rxsc::test::messages<std::vector<int>> v_on;
409 
410         std::runtime_error ex("buffer on_error from source");
411 
412         auto xs = sc.make_hot_observable({
413             on.next(100, 1),
414             on.next(210, 2),
415             on.next(240, 3),
416             on.next(280, 4),
417             on.next(320, 5),
418             on.next(350, 6),
419             on.next(380, 7),
420             on.next(420, 8),
421             on.next(470, 9),
422             on.error(600, ex)
423         });
424 
425         WHEN("group each int with the next 2 ints"){
426 
427             auto res = w.start(
__anon79d0578d0902() 428                 [&]() {
429                     return xs
430                         .buffer(3, 2)
431                         // forget type to workaround lambda deduction bug on msvc 2013
432                         .as_dynamic();
433                 }
434             );
435 
436             THEN("the output contains groups of ints"){
437                 auto required = rxu::to_vector({
438                     v_on.next(280, rxu::to_vector({ 2, 3, 4 })),
439                     v_on.next(350, rxu::to_vector({ 4, 5, 6 })),
440                     v_on.next(420, rxu::to_vector({ 6, 7, 8 })),
441                     v_on.error(600, ex)
442                 });
443                 auto actual = res.get_observer().messages();
444                 REQUIRE(required == actual);
445             }
446 
447             THEN("there was one subscription and one unsubscription to the xs"){
448                 auto required = rxu::to_vector({
449                     on.subscribe(200, 600)
450                 });
451                 auto actual = xs.subscriptions();
452                 REQUIRE(required == actual);
453             }
454         }
455     }
456 }
457 
458 SCENARIO("buffer with time on intervals", "[buffer_with_time][operators][long][!hide]"){
459     GIVEN("7 intervals of 2 seconds"){
460         WHEN("the period is 2sec and the initial is 5sec"){
461             // time:   |-----------------|
462             // events:      1 2 3 4 5 6 7
463             // buffers: ---
464             //             -1-
465             //                2-3
466             //                   -4-
467             //                      5-6
468             //                         -7
469             using namespace std::chrono;
470 
471             #define TIME milliseconds
472             #define UNIT *15
473 
474             auto sc = rxsc::make_current_thread();
475             auto so = rx::synchronize_in_one_worker(sc);
476             auto start = sc.now() + TIME(5 UNIT);
477             auto period = TIME(2 UNIT);
478 
479             auto bufSource = rxs::interval(start, period, so)
480                 | rxo::take(7)
481                 | rxo::buffer_with_time(TIME(3 UNIT), so);
482 
483             bufSource
484                 .subscribe(
__anon79d0578d0a02(std::vector<long> counter)485                     [](std::vector<long> counter){
486                         printf("on_next: ");
487                         std::for_each(counter.begin(), counter.end(), [](long c){
488                             printf("%ld ", c);
489                         });
490                         printf("\n");
491                     },
__anon79d0578d0c02(rxu::error_ptr)492                     [](rxu::error_ptr){
493                         printf("on_error\n");
494                     },
__anon79d0578d0d02()495                     [](){
496                         printf("on_completed\n");
497                     }
498                 );
499         }
500     }
501 }
502 
503 SCENARIO("buffer with time on intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
504     GIVEN("7 intervals of 2 seconds"){
505         WHEN("the period is 2sec and the initial is 5sec"){
506             // time:   |-----------------|
507             // events:      1 2 3 4 5 6 7
508             // buffers: ---
509             //             -1-
510             //                2-3
511             //                   -4-
512             //                      5-6
513             //                         -7
514             using namespace std::chrono;
515 
516             #define TIME milliseconds
517             #define UNIT *15
518 
519             auto sc = rxsc::make_current_thread();
520             auto so = rx::synchronize_in_one_worker(sc);
521             auto start = sc.now() + TIME(5 UNIT);
522             auto period = TIME(2 UNIT);
523 
524             rx::observable<>::interval(start, period, so)
525                 .take(7)
526                 .buffer_with_time(TIME(3 UNIT))
527                 .subscribe(
__anon79d0578d0e02(std::vector<long> counter)528                     [](std::vector<long> counter){
529                         printf("on_next: ");
530                         std::for_each(counter.begin(), counter.end(), [](long c){
531                             printf("%ld ", c);
532                         });
533                         printf("\n");
534                     },
__anon79d0578d1002(rxu::error_ptr)535                     [](rxu::error_ptr){
536                         printf("on_error\n");
537                     },
__anon79d0578d1102()538                     [](){
539                         printf("on_completed\n");
540                     }
541                 );
542         }
543     }
544 }
545 
546 SCENARIO("buffer with time on overlapping intervals", "[buffer_with_time][operators][long][!hide]"){
547     GIVEN("5 intervals of 2 seconds"){
548         WHEN("the period is 2sec and the initial is 5sec"){
549             // time:   |-------------|
550             // events:      1 2 3 4 5
551             // buffers: ----
552             //            --1-
553             //              1-2-
554             //                2-3-
555             //                  3-4-
556             //                    4-5
557             //                      5
558             using namespace std::chrono;
559 
560             #define TIME milliseconds
561             #define UNIT *15
562 
563             auto sc = rxsc::make_current_thread();
564             auto so = rx::synchronize_in_one_worker(sc);
565             auto start = sc.now() + TIME(5 UNIT);
566             auto period = TIME(2 UNIT);
567 
568             rx::observable<>::interval(start, period, so)
569                 .take(5)
570                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so)
571                 .subscribe(
__anon79d0578d1202(std::vector<long> counter)572                     [](std::vector<long> counter){
573                         printf("on_next: ");
574                         std::for_each(counter.begin(), counter.end(), [](long c){
575                             printf("%ld ", c);
576                         });
577                         printf("\n");
578                     },
__anon79d0578d1402(rxu::error_ptr)579                     [](rxu::error_ptr){
580                         printf("on_error\n");
581                     },
__anon79d0578d1502()582                     [](){
583                         printf("on_completed\n");
584                     }
585                 );
586         }
587     }
588 }
589 
590 SCENARIO("buffer with time on overlapping intervals, implicit coordination", "[buffer_with_time][operators][long][!hide]"){
591     GIVEN("5 intervals of 2 seconds"){
592         WHEN("the period is 2sec and the initial is 5sec"){
593             // time:   |-------------|
594             // events:      1 2 3 4 5
595             // buffers: ----
596             //            --1-
597             //              1-2-
598             //                2-3-
599             //                  3-4-
600             //                    4-5
601             //                      5
602             using namespace std::chrono;
603 
604             #define TIME milliseconds
605             #define UNIT *15
606 
607             auto sc = rxsc::make_current_thread();
608             auto so = rx::synchronize_in_one_worker(sc);
609             auto start = sc.now() + TIME(5 UNIT);
610             auto period = TIME(2 UNIT);
611 
612             rx::observable<>::interval(start, period, so)
613                 .take(5)
614                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT))
615                 .subscribe(
__anon79d0578d1602(std::vector<long> counter)616                     [](std::vector<long> counter){
617                         printf("on_next: ");
618                         std::for_each(counter.begin(), counter.end(), [](long c){
619                             printf("%ld ", c);
620                         });
621                         printf("\n");
622                     },
__anon79d0578d1802(rxu::error_ptr)623                     [](rxu::error_ptr){
624                         printf("on_error\n");
625                     },
__anon79d0578d1902()626                     [](){
627                         printf("on_completed\n");
628                     }
629                 );
630         }
631     }
632 }
633 
634 SCENARIO("buffer with time on intervals, error", "[buffer_with_time][operators][long][!hide]"){
635     GIVEN("5 intervals of 2 seconds"){
636         WHEN("the period is 2sec and the initial is 5sec"){
637             // time:   |-------------|
638             // events:      1 2 3 4 5
639             // buffers: ----
640             //            --1-
641             //              1-2-
642             //                2-3-
643             //                  3-4-
644             //                    4-5
645             //                      5
646             using namespace std::chrono;
647 
648             #define TIME milliseconds
649             #define UNIT *15
650 
651             auto sc = rxsc::make_current_thread();
652             auto so = rx::synchronize_in_one_worker(sc);
653             auto start = sc.now() + TIME(5 UNIT);
654             auto period = TIME(2 UNIT);
655 
656             std::runtime_error ex("buffer_with_time on_error from source");
657 
658             auto ys1 = rx::observable<>::interval(start, period, so).take(5);
659             auto ys2 = rx::observable<>::error<long, std::runtime_error>(std::runtime_error("buffer_with_time on_error from source"), so);
660             ys1.concat(so, ys2)
661                 .buffer_with_time(TIME(4 UNIT), TIME(2 UNIT), so)
662                 .subscribe(
__anon79d0578d1a02(std::vector<long> counter)663                     [](std::vector<long> counter){
664                         printf("on_next: ");
665                         std::for_each(counter.begin(), counter.end(), [](long c){
666                             printf("%ld ", c);
667                         });
668                         printf("\n");
669                     },
__anon79d0578d1c02(rxu::error_ptr)670                     [](rxu::error_ptr){
671                         printf("on_error\n");
672                     },
__anon79d0578d1d02()673                     [](){
674                         printf("on_completed\n");
675                     }
676                 );
677         }
678     }
679 }
680 
681 SCENARIO("buffer with time, overlapping intervals", "[buffer_with_time][operators]"){
682     GIVEN("1 hot observable of ints."){
683         auto sc = rxsc::make_test();
684         auto so = rx::synchronize_in_one_worker(sc);
685         auto w = sc.create_worker();
686         const rxsc::test::messages<int> on;
687         const rxsc::test::messages<std::vector<int>> v_on;
688 
689         auto xs = sc.make_hot_observable({
690             on.next(100, 1),
691             on.next(210, 2),
692             on.next(240, 3),
693             on.next(280, 4),
694             on.next(320, 5),
695             on.next(350, 6),
696             on.next(380, 7),
697             on.next(420, 8),
698             on.next(470, 9),
699             on.completed(600)
700         });
701         WHEN("group ints on intersecting intervals"){
702             using namespace std::chrono;
703 
704             auto res = w.start(
__anon79d0578d1e02() 705                 [&]() {
706                     return xs
707                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
708                         // forget type to workaround lambda deduction bug on msvc 2013
709                         .as_dynamic();
710                 }
711             );
712 
713             THEN("the output contains groups of ints"){
714                 auto required = rxu::to_vector({
715                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
716                     v_on.next(371, rxu::to_vector({ 4, 5, 6 })),
717                     v_on.next(441, rxu::to_vector({ 6, 7, 8 })),
718                     v_on.next(511, rxu::to_vector({ 8, 9 })),
719                     v_on.next(581, std::vector<int>()),
720                     v_on.next(601, std::vector<int>()),
721                     v_on.completed(601)
722                 });
723                 auto actual = res.get_observer().messages();
724                 REQUIRE(required == actual);
725             }
726 
727             THEN("there was one subscription and one unsubscription to the xs"){
728                 auto required = rxu::to_vector({
729                     on.subscribe(200, 600)
730                 });
731                 auto actual = xs.subscriptions();
732                 REQUIRE(required == actual);
733             }
734         }
735     }
736 }
737 
738 SCENARIO("buffer with time, intervals with skips", "[buffer_with_time][operators]"){
739     GIVEN("1 hot observable of ints."){
740         auto sc = rxsc::make_test();
741         auto so = rx::synchronize_in_one_worker(sc);
742         auto w = sc.create_worker();
743         const rxsc::test::messages<int> on;
744         const rxsc::test::messages<std::vector<int>> v_on;
745 
746         auto xs = sc.make_hot_observable({
747             on.next(100, 1),
748             on.next(210, 2),
749             on.next(240, 3),
750             on.next(280, 4),
751             on.next(320, 5),
752             on.next(350, 6),
753             on.next(380, 7),
754             on.next(420, 8),
755             on.next(470, 9),
756             on.completed(600)
757         });
758         WHEN("group ints on intervals with skips"){
759             using namespace std::chrono;
760 
761             auto res = w.start(
__anon79d0578d1f02() 762                 [&]() {
763                     return xs
764                         .buffer_with_time(milliseconds(70), milliseconds(100), so)
765                         // forget type to workaround lambda deduction bug on msvc 2013
766                         .as_dynamic();
767                 }
768             );
769 
770             THEN("the output contains groups of ints"){
771                 auto required = rxu::to_vector({
772                     v_on.next(271, rxu::to_vector({ 2, 3 })),
773                     v_on.next(371, rxu::to_vector({ 5, 6 })),
774                     v_on.next(471, rxu::to_vector({ 8, 9 })),
775                     v_on.next(571, std::vector<int>()),
776                     v_on.completed(601)
777                 });
778                 auto actual = res.get_observer().messages();
779                 REQUIRE(required == actual);
780             }
781 
782             THEN("there was one subscription and one unsubscription to the xs"){
783                 auto required = rxu::to_vector({
784                     on.subscribe(200, 600)
785                 });
786                 auto actual = xs.subscriptions();
787                 REQUIRE(required == actual);
788             }
789         }
790     }
791 }
792 
793 SCENARIO("buffer with time, error", "[buffer_with_time][operators]"){
794     GIVEN("1 hot observable of ints."){
795         auto sc = rxsc::make_test();
796         auto so = rx::synchronize_in_one_worker(sc);
797         auto w = sc.create_worker();
798         const rxsc::test::messages<int> on;
799         const rxsc::test::messages<std::vector<int>> v_on;
800 
801         std::runtime_error ex("buffer_with_time on_error from source");
802 
803         auto xs = sc.make_hot_observable({
804             on.next(100, 1),
805             on.next(210, 2),
806             on.next(240, 3),
807             on.next(280, 4),
808             on.next(320, 5),
809             on.next(350, 6),
810             on.next(380, 7),
811             on.next(420, 8),
812             on.next(470, 9),
813             on.error(600, ex)
814         });
815         WHEN("group ints on intersecting intervals"){
816             using namespace std::chrono;
817 
818             auto res = w.start(
__anon79d0578d2002() 819                 [&]() {
820                     return xs
821                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
822                         // forget type to workaround lambda deduction bug on msvc 2013
823                         .as_dynamic();
824                 }
825             );
826 
827             THEN("the output contains groups of ints"){
828                 auto required = rxu::to_vector({
829                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
830                     v_on.next(371, rxu::to_vector({ 4, 5, 6 })),
831                     v_on.next(441, rxu::to_vector({ 6, 7, 8 })),
832                     v_on.next(511, rxu::to_vector({ 8, 9 })),
833                     v_on.next(581, std::vector<int>()),
834                     v_on.error(601, ex)
835                 });
836                 auto actual = res.get_observer().messages();
837                 REQUIRE(required == actual);
838             }
839 
840             THEN("there was one subscription and one unsubscription to the xs"){
841                 auto required = rxu::to_vector({
842                     on.subscribe(200, 600)
843                 });
844                 auto actual = xs.subscriptions();
845                 REQUIRE(required == actual);
846             }
847         }
848     }
849 }
850 
851 SCENARIO("buffer with time, disposed", "[buffer_with_time][operators]"){
852     GIVEN("1 hot observable of ints."){
853         auto sc = rxsc::make_test();
854         auto so = rx::synchronize_in_one_worker(sc);
855         auto w = sc.create_worker();
856         const rxsc::test::messages<int> on;
857         const rxsc::test::messages<std::vector<int>> v_on;
858 
859         auto xs = sc.make_hot_observable({
860             on.next(100, 1),
861             on.next(210, 2),
862             on.next(240, 3),
863             on.next(280, 4),
864             on.next(320, 5),
865             on.next(350, 6),
866             on.next(380, 7),
867             on.next(420, 8),
868             on.next(470, 9),
869             on.completed(600)
870         });
871         WHEN("group ints on intersecting intervals"){
872             using namespace std::chrono;
873 
874             auto res = w.start(
__anon79d0578d2102() 875                 [&]() {
876                     return xs
877                         .buffer_with_time(milliseconds(100), milliseconds(70), so)
878                         // forget type to workaround lambda deduction bug on msvc 2013
879                         .as_dynamic();
880                 },
881                 370
882             );
883 
884             THEN("the output contains groups of ints"){
885                 auto required = rxu::to_vector({
886                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
887                 });
888                 auto actual = res.get_observer().messages();
889                 REQUIRE(required == actual);
890             }
891 
892             THEN("there was one subscription and one unsubscription to the xs"){
893                 auto required = rxu::to_vector({
894                     on.subscribe(200, 371)
895                 });
896                 auto actual = xs.subscriptions();
897                 REQUIRE(required == actual);
898             }
899         }
900     }
901 }
902 
903 SCENARIO("buffer with time, same", "[buffer_with_time][operators]"){
904     GIVEN("1 hot observable of ints."){
905         auto sc = rxsc::make_test();
906         auto so = rx::synchronize_in_one_worker(sc);
907         auto w = sc.create_worker();
908         const rxsc::test::messages<int> on;
909         const rxsc::test::messages<std::vector<int>> v_on;
910 
911         auto xs = sc.make_hot_observable({
912             on.next(100, 1),
913             on.next(210, 2),
914             on.next(240, 3),
915             on.next(280, 4),
916             on.next(320, 5),
917             on.next(350, 6),
918             on.next(380, 7),
919             on.next(420, 8),
920             on.next(470, 9),
921             on.completed(600)
922         });
923         WHEN("group ints on intervals"){
924             using namespace std::chrono;
925 
926             auto res = w.start(
__anon79d0578d2202() 927                 [&]() {
928                     return xs
929                         .buffer_with_time(milliseconds(100), so)
930                         // forget type to workaround lambda deduction bug on msvc 2013
931                         .as_dynamic();
932                 }
933             );
934 
935             THEN("the output contains groups of ints"){
936                 auto required = rxu::to_vector({
937                     v_on.next(301, rxu::to_vector({ 2, 3, 4 })),
938                     v_on.next(401, rxu::to_vector({ 5, 6, 7 })),
939                     v_on.next(501, rxu::to_vector({ 8, 9 })),
940                     v_on.next(601, std::vector<int>()),
941                     v_on.completed(601)
942                 });
943                 auto actual = res.get_observer().messages();
944                 REQUIRE(required == actual);
945             }
946 
947             THEN("there was one subscription and one unsubscription to the xs"){
948                 auto required = rxu::to_vector({
949                     on.subscribe(200, 600)
950                 });
951                 auto actual = xs.subscriptions();
952                 REQUIRE(required == actual);
953             }
954         }
955     }
956 }
957 
958 SCENARIO("buffer with time or count, basic", "[buffer_with_time_or_count][operators]"){
959     GIVEN("1 hot observable of ints."){
960         auto sc = rxsc::make_test();
961         auto so = rx::synchronize_in_one_worker(sc);
962         auto w = sc.create_worker();
963         const rxsc::test::messages<int> on;
964         const rxsc::test::messages<std::vector<int>> v_on;
965 
966         auto xs = sc.make_hot_observable({
967             on.next(205, 1),
968             on.next(210, 2),
969             on.next(240, 3),
970             on.next(280, 4),
971             on.next(320, 5),
972             on.next(350, 6),
973             on.next(370, 7),
974             on.next(420, 8),
975             on.next(470, 9),
976             on.completed(600)
977         });
978         WHEN("group ints on intervals"){
979             using namespace std::chrono;
980 
981             auto res = w.start(
__anon79d0578d2302() 982                 [&]() {
983                     return xs
984                         | rxo::buffer_with_time_or_count(milliseconds(70), 3, so)
985                         // forget type to workaround lambda deduction bug on msvc 2013
986                         | rxo::as_dynamic();
987                 }
988             );
989 
990             THEN("the output contains groups of ints"){
991                 auto required = rxu::to_vector({
992                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
993                     v_on.next(312, rxu::to_vector({ 4 })),
994                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
995                     v_on.next(442, rxu::to_vector({ 8 })),
996                     v_on.next(512, rxu::to_vector({ 9 })),
997                     v_on.next(582, std::vector<int>()),
998                     v_on.next(601, std::vector<int>()),
999                     v_on.completed(601)
1000                 });
1001                 auto actual = res.get_observer().messages();
1002                 REQUIRE(required == actual);
1003             }
1004 
1005             THEN("there was one subscription and one unsubscription to the xs"){
1006                 auto required = rxu::to_vector({
1007                     on.subscribe(200, 600)
1008                 });
1009                 auto actual = xs.subscriptions();
1010                 REQUIRE(required == actual);
1011             }
1012         }
1013     }
1014 }
1015 
1016 SCENARIO("buffer with time or count, error", "[buffer_with_time_or_count][operators]"){
1017     GIVEN("1 hot observable of ints."){
1018         auto sc = rxsc::make_test();
1019         auto so = rx::synchronize_in_one_worker(sc);
1020         auto w = sc.create_worker();
1021         const rxsc::test::messages<int> on;
1022         const rxsc::test::messages<std::vector<int>> v_on;
1023 
1024         std::runtime_error ex("buffer_with_time on_error from source");
1025 
1026         auto xs = sc.make_hot_observable({
1027             on.next(205, 1),
1028             on.next(210, 2),
1029             on.next(240, 3),
1030             on.next(280, 4),
1031             on.next(320, 5),
1032             on.next(350, 6),
1033             on.next(370, 7),
1034             on.next(420, 8),
1035             on.next(470, 9),
1036             on.error(600, ex)
1037         });
1038         WHEN("group ints on intervals"){
1039             using namespace std::chrono;
1040 
1041             auto res = w.start(
__anon79d0578d2402() 1042                 [&]() {
1043                     return xs
1044                         .buffer_with_time_or_count(milliseconds(70), 3, so)
1045                         // forget type to workaround lambda deduction bug on msvc 2013
1046                         .as_dynamic();
1047                 }
1048             );
1049 
1050             THEN("the output contains groups of ints"){
1051                 auto required = rxu::to_vector({
1052                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
1053                     v_on.next(312, rxu::to_vector({ 4 })),
1054                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
1055                     v_on.next(442, rxu::to_vector({ 8 })),
1056                     v_on.next(512, rxu::to_vector({ 9 })),
1057                     v_on.next(582, std::vector<int>()),
1058                     v_on.error(601, ex)
1059                 });
1060                 auto actual = res.get_observer().messages();
1061                 REQUIRE(required == actual);
1062             }
1063 
1064             THEN("there was one subscription and one unsubscription to the xs"){
1065                 auto required = rxu::to_vector({
1066                     on.subscribe(200, 600)
1067                 });
1068                 auto actual = xs.subscriptions();
1069                 REQUIRE(required == actual);
1070             }
1071         }
1072     }
1073 }
1074 
1075 SCENARIO("buffer with time or count, dispose", "[buffer_with_time_or_count][operators]"){
1076     GIVEN("1 hot observable of ints."){
1077         auto sc = rxsc::make_test();
1078         auto so = rx::synchronize_in_one_worker(sc);
1079         auto w = sc.create_worker();
1080         const rxsc::test::messages<int> on;
1081         const rxsc::test::messages<std::vector<int>> v_on;
1082 
1083         auto xs = sc.make_hot_observable({
1084             on.next(205, 1),
1085             on.next(210, 2),
1086             on.next(240, 3),
1087             on.next(280, 4),
1088             on.next(320, 5),
1089             on.next(350, 6),
1090             on.next(370, 7),
1091             on.next(420, 8),
1092             on.next(470, 9),
1093             on.completed(600)
1094         });
1095         WHEN("group ints on intervals"){
1096             using namespace std::chrono;
1097 
1098             auto res = w.start(
__anon79d0578d2502() 1099                 [&]() {
1100                     return xs
1101                         .buffer_with_time_or_count(milliseconds(70), 3, so)
1102                         // forget type to workaround lambda deduction bug on msvc 2013
1103                         .as_dynamic();
1104                 },
1105                 372
1106             );
1107 
1108             THEN("the output contains groups of ints"){
1109                 auto required = rxu::to_vector({
1110                     v_on.next(241, rxu::to_vector({ 1, 2, 3 })),
1111                     v_on.next(312, rxu::to_vector({ 4 })),
1112                     v_on.next(371, rxu::to_vector({ 5, 6, 7 })),
1113                 });
1114                 auto actual = res.get_observer().messages();
1115                 REQUIRE(required == actual);
1116             }
1117 
1118             THEN("there was one subscription and one unsubscription to the xs"){
1119                 auto required = rxu::to_vector({
1120                     on.subscribe(200, 373)
1121                 });
1122                 auto actual = xs.subscriptions();
1123                 REQUIRE(required == actual);
1124             }
1125         }
1126     }
1127 }
1128 
1129 SCENARIO("buffer with time or count, only time triggered", "[buffer_with_time_or_count][operators]"){
1130     GIVEN("1 hot observable of ints."){
1131         auto sc = rxsc::make_test();
1132         auto so = rx::synchronize_in_one_worker(sc);
1133         auto w = sc.create_worker();
1134         const rxsc::test::messages<int> on;
1135         const rxsc::test::messages<std::vector<int>> v_on;
1136 
1137         auto xs = sc.make_hot_observable({
1138             on.next(205, 1),
1139             on.next(305, 2),
1140             on.next(505, 3),
1141             on.next(605, 4),
1142             on.next(610, 5),
1143             on.completed(850)
1144         });
1145         WHEN("group ints on intervals"){
1146             using namespace std::chrono;
1147 
1148             auto res = w.start(
__anon79d0578d2602() 1149                 [&]() {
1150                     return xs
1151                         .buffer_with_time_or_count(milliseconds(100), 3, so)
1152                         // forget type to workaround lambda deduction bug on msvc 2013
1153                         .as_dynamic();
1154                 }
1155             );
1156 
1157             THEN("the output contains groups of ints"){
1158                 auto required = rxu::to_vector({
1159                     v_on.next(301, rxu::to_vector({ 1 })),
1160                     v_on.next(401, rxu::to_vector({ 2 })),
1161                     v_on.next(501, std::vector<int>()),
1162                     v_on.next(601, rxu::to_vector({ 3 })),
1163                     v_on.next(701, rxu::to_vector({ 4, 5 })),
1164                     v_on.next(801, std::vector<int>()),
1165                     v_on.next(851, std::vector<int>()),
1166                     v_on.completed(851)
1167                 });
1168                 auto actual = res.get_observer().messages();
1169                 REQUIRE(required == actual);
1170             }
1171 
1172             THEN("there was one subscription and one unsubscription to the xs"){
1173                 auto required = rxu::to_vector({
1174                     on.subscribe(200, 850)
1175                 });
1176                 auto actual = xs.subscriptions();
1177                 REQUIRE(required == actual);
1178             }
1179         }
1180     }
1181 }
1182 
1183 SCENARIO("buffer with time or count, only count triggered", "[buffer_with_time_or_count][operators]"){
1184     GIVEN("1 hot observable of ints."){
1185         auto sc = rxsc::make_test();
1186         auto so = rx::synchronize_in_one_worker(sc);
1187         auto w = sc.create_worker();
1188         const rxsc::test::messages<int> on;
1189         const rxsc::test::messages<std::vector<int>> v_on;
1190 
1191         auto xs = sc.make_hot_observable({
1192             on.next(205, 1),
1193             on.next(305, 2),
1194             on.next(505, 3),
1195             on.next(605, 4),
1196             on.next(610, 5),
1197             on.completed(850)
1198         });
1199         WHEN("group ints on intervals"){
1200             using namespace std::chrono;
1201 
1202             auto res = w.start(
__anon79d0578d2702() 1203                 [&]() {
1204                     return xs
1205                         .buffer_with_time_or_count(milliseconds(370), 2, so)
1206                         // forget type to workaround lambda deduction bug on msvc 2013
1207                         .as_dynamic();
1208                 }
1209             );
1210 
1211             THEN("the output contains groups of ints"){
1212                 auto required = rxu::to_vector({
1213                     v_on.next(306, rxu::to_vector({ 1, 2 })),
1214                     v_on.next(606, rxu::to_vector({ 3, 4 })),
1215                     v_on.next(851, rxu::to_vector({ 5 })),
1216                     v_on.completed(851)
1217                 });
1218                 auto actual = res.get_observer().messages();
1219                 REQUIRE(required == actual);
1220             }
1221 
1222             THEN("there was one subscription and one unsubscription to the xs"){
1223                 auto required = rxu::to_vector({
1224                     on.subscribe(200, 850)
1225                 });
1226                 auto actual = xs.subscriptions();
1227                 REQUIRE(required == actual);
1228             }
1229         }
1230     }
1231 }
1232