• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "../test.h"
2 #include <rxcpp/operators/rx-reduce.hpp>
3 #include <rxcpp/operators/rx-map.hpp>
4 #include <rxcpp/operators/rx-merge.hpp>
5 #include <rxcpp/operators/rx-window.hpp>
6 #include <rxcpp/operators/rx-window_time.hpp>
7 #include <rxcpp/operators/rx-window_time_count.hpp>
8 
9 SCENARIO("window count, basic", "[window][operators]"){
10     GIVEN("1 hot observable of ints."){
11         auto sc = rxsc::make_test();
12         auto w = sc.create_worker();
13         const rxsc::test::messages<int> on;
14         const rxsc::test::messages<rx::observable<int>> o_on;
15 
16         auto xs = sc.make_hot_observable({
17             on.next(100, 1),
18             on.next(210, 2),
19             on.next(240, 3),
20             on.next(280, 4),
21             on.next(320, 5),
22             on.next(350, 6),
23             on.next(380, 7),
24             on.next(420, 8),
25             on.next(470, 9),
26             on.completed(600)
27         });
28 
29         WHEN("group each int with the next 2 ints"){
30             auto res = w.start(
__anon38a1318b0102() 31                 [&]() {
32                     return xs
33                         .window(3, 2)
34                         .merge()
35                         // forget type to workaround lambda deduction bug on msvc 2013
36                         .as_dynamic();
37                 }
38             );
39 
40             THEN("the output contains merged groups of ints"){
41                 auto required = rxu::to_vector({
42                     on.next(210, 2),
43                     on.next(240, 3),
44                     on.next(280, 4),
45                     on.next(280, 4),
46                     on.next(320, 5),
47                     on.next(350, 6),
48                     on.next(350, 6),
49                     on.next(380, 7),
50                     on.next(420, 8),
51                     on.next(420, 8),
52                     on.next(470, 9),
53                     on.completed(600)
54                 });
55                 auto actual = res.get_observer().messages();
56                 REQUIRE(required == actual);
57             }
58 
59             THEN("there was one subscription and one unsubscription to the observable"){
60                 auto required = rxu::to_vector({
61                     o_on.subscribe(200, 600)
62                 });
63                 auto actual = xs.subscriptions();
64                 REQUIRE(required == actual);
65             }
66         }
67     }
68 }
69 
70 SCENARIO("window count, inner timings", "[window][operators]"){
71     GIVEN("1 hot observable of ints."){
72         auto sc = rxsc::make_test();
73         auto w = sc.create_worker();
74         const rxsc::test::messages<int> on;
75         const rxsc::test::messages<rx::observable<int>> o_on;
76 
77         auto xs = sc.make_hot_observable({
78             on.next(100, 1),
79             on.next(210, 2),
80             on.next(240, 3),
81             on.next(280, 4),
82             on.next(320, 5),
83             on.next(350, 6),
84             on.next(380, 7),
85             on.next(420, 8),
86             on.next(470, 9),
87             on.completed(600)
88         });
89 
90         WHEN("group each int with the next 2 ints"){
91             auto res = rxcpp::observable<rxcpp::observable<int>>();
92             auto windows = std::vector<rxcpp::observable<int>>();
93             auto observers = std::vector<rxt::testable_observer<int>>();
94 
95             w.schedule_absolute(
96                 rxsc::test::created_time,
__anon38a1318b0202(const rxsc::schedulable&) 97                 [&](const rxsc::schedulable&) {
98                     res = xs
99                         | rxo::window(3, 2);
100                 }
101             );
102 
103             w.schedule_absolute(
104                 rxsc::test::subscribed_time,
__anon38a1318b0302(const rxsc::schedulable&) 105                 [&](const rxsc::schedulable&) {
106                     res.subscribe(
107                         // on_next
108                         [&](rx::observable<int> window) {
109                             auto result = w.make_subscriber<int>();
110                             windows.push_back(window);
111                             observers.push_back(result.get_observer());
112                             window.subscribe(result);
113                         }
114                     );
115                 }
116             );
117 
118             w.start();
119 
120             THEN("the output contains 5 windows"){
121                 REQUIRE(5 == observers.size());
122             }
123 
124             THEN("the 1st output window contains ints"){
125                 auto required = rxu::to_vector({
126                     on.next(210, 2),
127                     on.next(240, 3),
128                     on.next(280, 4),
129                     on.completed(280)
130                 });
131                 auto actual = observers[0].messages();
132                 REQUIRE(required == actual);
133             }
134 
135             THEN("the 2nd output window contains ints"){
136                 auto required = rxu::to_vector({
137                     on.next(280, 4),
138                     on.next(320, 5),
139                     on.next(350, 6),
140                     on.completed(350)
141                 });
142                 auto actual = observers[1].messages();
143                 REQUIRE(required == actual);
144             }
145 
146             THEN("the 3rd output window contains ints"){
147                 auto required = rxu::to_vector({
148                     on.next(350, 6),
149                     on.next(380, 7),
150                     on.next(420, 8),
151                     on.completed(420)
152                 });
153                 auto actual = observers[2].messages();
154                 REQUIRE(required == actual);
155             }
156 
157             THEN("the 4th output window contains ints"){
158                 auto required = rxu::to_vector({
159                     on.next(420, 8),
160                     on.next(470, 9),
161                     on.completed(600)
162                 });
163                 auto actual = observers[3].messages();
164                 REQUIRE(required == actual);
165             }
166 
167             THEN("the 5th output window only contains complete message"){
168                 auto required = rxu::to_vector({
169                     on.completed(600)
170                 });
171                 auto actual = observers[4].messages();
172                 REQUIRE(required == actual);
173             }
174 
175             THEN("there was one subscription and one unsubscription to the observable"){
176                 auto required = rxu::to_vector({
177                     o_on.subscribe(200, 600)
178                 });
179                 auto actual = xs.subscriptions();
180                 REQUIRE(required == actual);
181             }
182         }
183     }
184 }
185 
186 SCENARIO("window count, dispose", "[window][operators]"){
187     GIVEN("1 hot observable of ints."){
188         auto sc = rxsc::make_test();
189         auto w = sc.create_worker();
190         const rxsc::test::messages<int> on;
191         const rxsc::test::messages<rx::observable<int>> o_on;
192 
193         auto xs = sc.make_hot_observable({
194             on.next(100, 1),
195             on.next(210, 2),
196             on.next(240, 3),
197             on.next(280, 4),
198             on.next(320, 5),
199             on.next(350, 6),
200             on.next(380, 7),
201             on.next(420, 8),
202             on.next(470, 9),
203             on.completed(600)
204         });
205 
206         WHEN("group each int with the next 2 ints"){
207             auto res = w.start(
__anon38a1318b0502() 208                 [&]() {
209                     return xs
210                         .window(3, 2)
211                         .merge()
212                         // forget type to workaround lambda deduction bug on msvc 2013
213                         .as_dynamic();
214                 },
215                 370
216             );
217 
218             THEN("the output contains merged groups of ints"){
219                 auto required = rxu::to_vector({
220                     on.next(210, 2),
221                     on.next(240, 3),
222                     on.next(280, 4),
223                     on.next(280, 4),
224                     on.next(320, 5),
225                     on.next(350, 6),
226                     on.next(350, 6)
227                 });
228                 auto actual = res.get_observer().messages();
229                 REQUIRE(required == actual);
230             }
231 
232             THEN("there was one subscription and one unsubscription to the observable"){
233                 auto required = rxu::to_vector({
234                     o_on.subscribe(200, 370)
235                 });
236                 auto actual = xs.subscriptions();
237                 REQUIRE(required == actual);
238             }
239         }
240     }
241 }
242 
243 SCENARIO("window count, error", "[window][operators]"){
244     GIVEN("1 hot observable of ints."){
245         auto sc = rxsc::make_test();
246         auto w = sc.create_worker();
247         const rxsc::test::messages<int> on;
248         const rxsc::test::messages<rx::observable<int>> o_on;
249 
250         std::runtime_error ex("window on_error from source");
251 
252         auto xs = sc.make_hot_observable({
253             on.next(100, 1),
254             on.next(210, 2),
255             on.next(240, 3),
256             on.next(280, 4),
257             on.next(320, 5),
258             on.next(350, 6),
259             on.next(380, 7),
260             on.next(420, 8),
261             on.next(470, 9),
262             on.error(600, ex)
263         });
264 
265         WHEN("group each int with the next 2 ints"){
266             auto res = w.start(
__anon38a1318b0602() 267                 [&]() {
268                     return xs
269                         .window(3, 2)
270                         .merge()
271                         // forget type to workaround lambda deduction bug on msvc 2013
272                         .as_dynamic();
273                 }
274             );
275 
276             THEN("the output contains merged groups of ints"){
277                 auto required = rxu::to_vector({
278                     on.next(210, 2),
279                     on.next(240, 3),
280                     on.next(280, 4),
281                     on.next(280, 4),
282                     on.next(320, 5),
283                     on.next(350, 6),
284                     on.next(350, 6),
285                     on.next(380, 7),
286                     on.next(420, 8),
287                     on.next(420, 8),
288                     on.next(470, 9),
289                     on.error(600, ex)
290                 });
291                 auto actual = res.get_observer().messages();
292                 REQUIRE(required == actual);
293             }
294 
295             THEN("there was one subscription and one unsubscription to the observable"){
296                 auto required = rxu::to_vector({
297                     o_on.subscribe(200, 600)
298                 });
299                 auto actual = xs.subscriptions();
300                 REQUIRE(required == actual);
301             }
302         }
303     }
304 }
305 
306 SCENARIO("window with time, basic", "[window_with_time][operators]"){
307     GIVEN("1 hot observable of ints."){
308         auto sc = rxsc::make_test();
309         auto so = rx::synchronize_in_one_worker(sc);
310         auto w = sc.create_worker();
311         const rxsc::test::messages<int> on;
312         const rxsc::test::messages<rx::observable<int>> o_on;
313 
314         auto xs = sc.make_hot_observable({
315             on.next(150, 1),
316             on.next(210, 2),
317             on.next(240, 3),
318             on.next(270, 4),
319             on.next(320, 5),
320             on.next(360, 6),
321             on.next(390, 7),
322             on.next(410, 8),
323             on.next(460, 9),
324             on.next(470, 10),
325             on.completed(490)
326         });
327 
328         WHEN("group ints by 100 time units"){
329             using namespace std::chrono;
330 
331             auto res = w.start(
__anon38a1318b0702() 332                 [&]() {
333                     return xs
334                         | rxo::window_with_time(milliseconds(100), milliseconds(50), so)
335                         | rxo::merge()
336                         // forget type to workaround lambda deduction bug on msvc 2013
337                         | rxo::as_dynamic();
338                 }
339             );
340 
341             THEN("the output contains merged groups of ints"){
342                 auto required = rxu::to_vector({
343                     on.next(211, 2),
344                     on.next(241, 3),
345                     on.next(271, 4),
346                     on.next(271, 4),
347                     on.next(321, 5),
348                     on.next(321, 5),
349                     on.next(361, 6),
350                     on.next(361, 6),
351                     on.next(391, 7),
352                     on.next(391, 7),
353                     on.next(411, 8),
354                     on.next(411, 8),
355                     on.next(461, 9),
356                     on.next(461, 9),
357                     on.next(471, 10),
358                     on.next(471, 10),
359                     on.completed(491)
360                 });
361                 auto actual = res.get_observer().messages();
362                 REQUIRE(required == actual);
363             }
364 
365             THEN("there was one subscription and one unsubscription to the observable"){
366                 auto required = rxu::to_vector({
367                     o_on.subscribe(200, 490)
368                 });
369                 auto actual = xs.subscriptions();
370                 REQUIRE(required == actual);
371             }
372         }
373     }
374 }
375 
376 SCENARIO("window with time, basic same", "[window_with_time][operators]"){
377     GIVEN("1 hot observable of ints."){
378         auto sc = rxsc::make_test();
379         auto so = rx::synchronize_in_one_worker(sc);
380         auto w = sc.create_worker();
381         const rxsc::test::messages<int> on;
382         const rxsc::test::messages<rx::observable<int>> o_on;
383 
384         auto xs = sc.make_hot_observable({
385             on.next(150, 1),
386             on.next(210, 2),
387             on.next(240, 3),
388             on.next(270, 4),
389             on.next(320, 5),
390             on.next(360, 6),
391             on.next(390, 7),
392             on.next(410, 8),
393             on.next(460, 9),
394             on.next(470, 10),
395             on.completed(490)
396         });
397 
398         WHEN("group each int with the next 2 ints"){
399             using namespace std::chrono;
400 
401             auto res = w.start(
__anon38a1318b0802() 402                 [&]() {
403                     return xs
404                         .window_with_time(milliseconds(100), so)
405                         .merge()
406                         // forget type to workaround lambda deduction bug on msvc 2013
407                         .as_dynamic();
408                 }
409             );
410 
411             THEN("the output contains merged groups of ints"){
412                 auto required = rxu::to_vector({
413                     on.next(211, 2),
414                     on.next(241, 3),
415                     on.next(271, 4),
416                     on.next(321, 5),
417                     on.next(361, 6),
418                     on.next(391, 7),
419                     on.next(411, 8),
420                     on.next(461, 9),
421                     on.next(471, 10),
422                     on.completed(491)
423                 });
424                 auto actual = res.get_observer().messages();
425                 REQUIRE(required == actual);
426             }
427 
428             THEN("there was one subscription and one unsubscription to the observable"){
429                 auto required = rxu::to_vector({
430                     o_on.subscribe(200, 490)
431                 });
432                 auto actual = xs.subscriptions();
433                 REQUIRE(required == actual);
434             }
435         }
436     }
437 }
438 
439 SCENARIO("window with time, basic 1", "[window_with_time][operators]"){
440     GIVEN("1 hot observable of ints."){
441         auto sc = rxsc::make_test();
442         auto so = rx::synchronize_in_one_worker(sc);
443         auto w = sc.create_worker();
444         const rxsc::test::messages<int> on;
445         const rxsc::test::messages<rx::observable<int>> o_on;
446 
447         auto xs = sc.make_hot_observable({
448             on.next(100, 1),
449             on.next(210, 2),
450             on.next(240, 3),
451             on.next(280, 4),
452             on.next(320, 5),
453             on.next(350, 6),
454             on.next(380, 7),
455             on.next(420, 8),
456             on.next(470, 9),
457             on.completed(600)
458         });
459 
460         WHEN("group each int with the next 2 ints"){
461             using namespace std::chrono;
462 
463             auto res = w.start(
__anon38a1318b0902() 464                 [&]() {
465                     return xs
466                         .window_with_time(milliseconds(100), milliseconds(70), so)
467                         .merge()
468                         // forget type to workaround lambda deduction bug on msvc 2013
469                         .as_dynamic();
470                 }
471             );
472 
473             THEN("the output contains merged groups of ints"){
474                 auto required = rxu::to_vector({
475                     on.next(211, 2),
476                     on.next(241, 3),
477                     on.next(281, 4),
478                     on.next(281, 4),
479                     on.next(321, 5),
480                     on.next(351, 6),
481                     on.next(351, 6),
482                     on.next(381, 7),
483                     on.next(421, 8),
484                     on.next(421, 8),
485                     on.next(471, 9),
486                     on.completed(601)
487                 });
488                 auto actual = res.get_observer().messages();
489                 REQUIRE(required == actual);
490             }
491 
492             THEN("there was one subscription and one unsubscription to the observable"){
493                 auto required = rxu::to_vector({
494                     o_on.subscribe(200, 600)
495                 });
496                 auto actual = xs.subscriptions();
497                 REQUIRE(required == actual);
498             }
499         }
500     }
501 }
502 
503 SCENARIO("window with time, basic 2", "[window_with_time][operators]"){
504     GIVEN("1 hot observable of ints."){
505         auto sc = rxsc::make_test();
506         auto so = rx::synchronize_in_one_worker(sc);
507         auto w = sc.create_worker();
508         const rxsc::test::messages<int> on;
509         const rxsc::test::messages<rx::observable<int>> o_on;
510 
511         auto xs = sc.make_hot_observable({
512             on.next(100, 1),
513             on.next(210, 2),
514             on.next(240, 3),
515             on.next(280, 4),
516             on.next(320, 5),
517             on.next(350, 6),
518             on.next(380, 7),
519             on.next(420, 8),
520             on.next(470, 9),
521             on.completed(600)
522         });
523 
524         WHEN("group each int with the next 2 ints"){
525             using namespace std::chrono;
526 
527             auto res = w.start(
__anon38a1318b0a02() 528                 [&]() {
529                     return xs
530                         .window_with_time(milliseconds(70), milliseconds(100), so)
531                         .merge()
532                         // forget type to workaround lambda deduction bug on msvc 2013
533                         .as_dynamic();
534                 }
535             );
536 
537             THEN("the output contains merged groups of ints"){
538                 auto required = rxu::to_vector({
539                     on.next(211, 2),
540                     on.next(241, 3),
541                     on.next(321, 5),
542                     on.next(351, 6),
543                     on.next(421, 8),
544                     on.next(471, 9),
545                     on.completed(601)
546                 });
547                 auto actual = res.get_observer().messages();
548                 REQUIRE(required == actual);
549             }
550 
551             THEN("there was one subscription and one unsubscription to the observable"){
552                 auto required = rxu::to_vector({
553                     o_on.subscribe(200, 600)
554                 });
555                 auto actual = xs.subscriptions();
556                 REQUIRE(required == actual);
557             }
558         }
559     }
560 }
561 
562 SCENARIO("window with time, error", "[window_with_time][operators]"){
563     GIVEN("1 hot observable of ints."){
564         auto sc = rxsc::make_test();
565         auto so = rx::synchronize_in_one_worker(sc);
566         auto w = sc.create_worker();
567         const rxsc::test::messages<int> on;
568         const rxsc::test::messages<rx::observable<int>> o_on;
569 
570         std::runtime_error ex("window_with_time on_error from source");
571 
572         auto xs = sc.make_hot_observable({
573             on.next(100, 1),
574             on.next(210, 2),
575             on.next(240, 3),
576             on.next(280, 4),
577             on.next(320, 5),
578             on.next(350, 6),
579             on.next(380, 7),
580             on.next(420, 8),
581             on.next(470, 9),
582             on.error(600, ex)
583         });
584 
585         WHEN("group each int with the next 2 ints"){
586             using namespace std::chrono;
587 
588             auto res = w.start(
__anon38a1318b0b02() 589                 [&]() {
590                     return xs
591                         .window_with_time(milliseconds(100), milliseconds(70), so)
592                         .merge()
593                         // forget type to workaround lambda deduction bug on msvc 2013
594                         .as_dynamic();
595                 }
596             );
597 
598             THEN("the output contains merged groups of ints"){
599                 auto required = rxu::to_vector({
600                     on.next(211, 2),
601                     on.next(241, 3),
602                     on.next(281, 4),
603                     on.next(281, 4),
604                     on.next(321, 5),
605                     on.next(351, 6),
606                     on.next(351, 6),
607                     on.next(381, 7),
608                     on.next(421, 8),
609                     on.next(421, 8),
610                     on.next(471, 9),
611                     on.error(601, ex)
612                 });
613                 auto actual = res.get_observer().messages();
614                 REQUIRE(required == actual);
615             }
616 
617             THEN("there was one subscription and one unsubscription to the observable"){
618                 auto required = rxu::to_vector({
619                     o_on.subscribe(200, 600)
620                 });
621                 auto actual = xs.subscriptions();
622                 REQUIRE(required == actual);
623             }
624         }
625     }
626 }
627 
628 SCENARIO("window with time, disposed", "[window_with_time][operators]"){
629     GIVEN("1 hot observable of ints."){
630         auto sc = rxsc::make_test();
631         auto so = rx::synchronize_in_one_worker(sc);
632         auto w = sc.create_worker();
633         const rxsc::test::messages<int> on;
634         const rxsc::test::messages<rx::observable<int>> o_on;
635 
636         auto xs = sc.make_hot_observable({
637             on.next(100, 1),
638             on.next(210, 2),
639             on.next(240, 3),
640             on.next(280, 4),
641             on.next(320, 5),
642             on.next(350, 6),
643             on.next(380, 7),
644             on.next(420, 8),
645             on.next(470, 9),
646             on.completed(600)
647         });
648 
649         WHEN("group each int with the next 2 ints"){
650             using namespace std::chrono;
651 
652             auto res = w.start(
__anon38a1318b0c02() 653                 [&]() {
654                     return xs
655                         .window_with_time(milliseconds(100), milliseconds(70), so)
656                         .merge()
657                         // forget type to workaround lambda deduction bug on msvc 2013
658                         .as_dynamic();
659                 },
660                 370
661             );
662 
663             THEN("the output contains merged groups of ints"){
664                 auto required = rxu::to_vector({
665                     on.next(211, 2),
666                     on.next(241, 3),
667                     on.next(281, 4),
668                     on.next(281, 4),
669                     on.next(321, 5),
670                     on.next(351, 6),
671                     on.next(351, 6),
672                 });
673                 auto actual = res.get_observer().messages();
674                 REQUIRE(required == actual);
675             }
676 
677             THEN("there was one subscription and one unsubscription to the observable"){
678                 auto required = rxu::to_vector({
679                     o_on.subscribe(200, 371)
680                 });
681                 auto actual = xs.subscriptions();
682                 REQUIRE(required == actual);
683             }
684         }
685     }
686 }
687 
688 SCENARIO("window with time, basic same 1", "[window_with_time][operators]"){
689     GIVEN("1 hot observable of ints."){
690         auto sc = rxsc::make_test();
691         auto so = rx::synchronize_in_one_worker(sc);
692         auto w = sc.create_worker();
693         const rxsc::test::messages<int> on;
694         const rxsc::test::messages<rx::observable<int>> o_on;
695 
696         auto xs = sc.make_hot_observable({
697             on.next(100, 1),
698             on.next(210, 2),
699             on.next(240, 3),
700             on.next(280, 4),
701             on.next(320, 5),
702             on.next(350, 6),
703             on.next(380, 7),
704             on.next(420, 8),
705             on.next(470, 9),
706             on.completed(600)
707         });
708 
709         WHEN("group each int with the next 2 ints"){
710             using namespace std::chrono;
711 
712             auto res = w.start(
__anon38a1318b0d02() 713                 [&]() {
714                     return xs
715                         .window_with_time(milliseconds(70), so)
716                         .merge()
717                         // forget type to workaround lambda deduction bug on msvc 2013
718                         .as_dynamic();
719                 }
720             );
721 
722             THEN("the output contains merged groups of ints"){
723                 auto required = rxu::to_vector({
724                     on.next(211, 2),
725                     on.next(241, 3),
726                     on.next(281, 4),
727                     on.next(321, 5),
728                     on.next(351, 6),
729                     on.next(381, 7),
730                     on.next(421, 8),
731                     on.next(471, 9),
732                     on.completed(601)
733                 });
734                 auto actual = res.get_observer().messages();
735                 REQUIRE(required == actual);
736             }
737 
738             THEN("there was one subscription and one unsubscription to the observable"){
739                 auto required = rxu::to_vector({
740                     o_on.subscribe(200, 600)
741                 });
742                 auto actual = xs.subscriptions();
743                 REQUIRE(required == actual);
744             }
745         }
746     }
747 }
748 
749 SCENARIO("window with time or count, basic", "[window_with_time_or_count][operators]"){
750     GIVEN("1 hot observable of ints."){
751         auto sc = rxsc::make_test();
752         auto so = rx::synchronize_in_one_worker(sc);
753         auto w = sc.create_worker();
754         const rxsc::test::messages<int> on;
755         const rxsc::test::messages<rx::observable<int>> o_on;
756 
757         auto xs = sc.make_hot_observable({
758             on.next(205, 1),
759             on.next(210, 2),
760             on.next(240, 3),
761             on.next(280, 4),
762             on.next(320, 5),
763             on.next(350, 6),
764             on.next(370, 7),
765             on.next(420, 8),
766             on.next(470, 9),
767             on.completed(600)
768         });
769 
770         WHEN("group each int with the next 2 ints"){
771             using namespace std::chrono;
772 
773             auto res = w.start(
__anon38a1318b0e02() 774                 [&]() {
775                     return xs
776                         | rxo::window_with_time_or_count(milliseconds(70), 3, so)
777                         | rxo::merge()
778                         // forget type to workaround lambda deduction bug on msvc 2013
779                         | rxo::as_dynamic();
780                 }
781             );
782 
783             THEN("the output contains merged groups of ints"){
784                 auto required = rxu::to_vector({
785                     on.next(206, 1),
786                     on.next(211, 2),
787                     on.next(241, 3),
788                     on.next(281, 4),
789                     on.next(321, 5),
790                     on.next(351, 6),
791                     on.next(371, 7),
792                     on.next(421, 8),
793                     on.next(471, 9),
794                     on.completed(601)
795                 });
796                 auto actual = res.get_observer().messages();
797                 REQUIRE(required == actual);
798             }
799 
800             THEN("there was one subscription and one unsubscription to the observable"){
801                 auto required = rxu::to_vector({
802                     o_on.subscribe(200, 600)
803                 });
804                 auto actual = xs.subscriptions();
805                 REQUIRE(required == actual);
806             }
807         }
808     }
809 }
810 
811 SCENARIO("window with time or count, error", "[window_with_time_or_count][operators]"){
812     GIVEN("1 hot observable of ints."){
813         auto sc = rxsc::make_test();
814         auto so = rx::synchronize_in_one_worker(sc);
815         auto w = sc.create_worker();
816         const rxsc::test::messages<int> on;
817         const rxsc::test::messages<rx::observable<int>> o_on;
818 
819         std::runtime_error ex("window_with_time on_error from source");
820 
821         auto xs = sc.make_hot_observable({
822             on.next(205, 1),
823             on.next(210, 2),
824             on.next(240, 3),
825             on.next(280, 4),
826             on.next(320, 5),
827             on.next(350, 6),
828             on.next(370, 7),
829             on.next(420, 8),
830             on.next(470, 9),
831             on.error(600, ex)
832         });
833 
834         WHEN("group each int with the next 2 ints"){
835             using namespace std::chrono;
836 
837             auto res = w.start(
__anon38a1318b0f02() 838                 [&]() {
839                     return xs
840                         .window_with_time_or_count(milliseconds(70), 3, so)
841                         .merge()
842                         // forget type to workaround lambda deduction bug on msvc 2013
843                         .as_dynamic();
844                 }
845             );
846 
847             THEN("the output contains merged groups of ints"){
848                 auto required = rxu::to_vector({
849                     on.next(206, 1),
850                     on.next(211, 2),
851                     on.next(241, 3),
852                     on.next(281, 4),
853                     on.next(321, 5),
854                     on.next(351, 6),
855                     on.next(371, 7),
856                     on.next(421, 8),
857                     on.next(471, 9),
858                     on.error(601, ex)
859                 });
860                 auto actual = res.get_observer().messages();
861                 REQUIRE(required == actual);
862             }
863 
864             THEN("there was one subscription and one unsubscription to the observable"){
865                 auto required = rxu::to_vector({
866                     o_on.subscribe(200, 600)
867                 });
868                 auto actual = xs.subscriptions();
869                 REQUIRE(required == actual);
870             }
871         }
872     }
873 }
874 
875 SCENARIO("window with time or count, disposed", "[window_with_time_or_count][operators]"){
876     GIVEN("1 hot observable of ints."){
877         auto sc = rxsc::make_test();
878         auto so = rx::synchronize_in_one_worker(sc);
879         auto w = sc.create_worker();
880         const rxsc::test::messages<int> on;
881         const rxsc::test::messages<rx::observable<int>> o_on;
882 
883         auto xs = sc.make_hot_observable({
884             on.next(205, 1),
885             on.next(210, 2),
886             on.next(240, 3),
887             on.next(280, 4),
888             on.next(320, 5),
889             on.next(350, 6),
890             on.next(370, 7),
891             on.next(420, 8),
892             on.next(470, 9),
893             on.completed(600)
894         });
895 
896         WHEN("group each int with the next 2 ints"){
897             using namespace std::chrono;
898 
899             auto res = w.start(
__anon38a1318b1002() 900                 [&]() {
901                     return xs
902                         .window_with_time_or_count(milliseconds(70), 3, so)
903                         .merge()
904                         // forget type to workaround lambda deduction bug on msvc 2013
905                         .as_dynamic();
906                 },
907                 372
908             );
909 
910             THEN("the output contains merged groups of ints"){
911                 auto required = rxu::to_vector({
912                     on.next(206, 1),
913                     on.next(211, 2),
914                     on.next(241, 3),
915                     on.next(281, 4),
916                     on.next(321, 5),
917                     on.next(351, 6),
918                     on.next(371, 7)
919                 });
920                 auto actual = res.get_observer().messages();
921                 REQUIRE(required == actual);
922             }
923 
924             THEN("there was one subscription and one unsubscription to the observable"){
925                 auto required = rxu::to_vector({
926                     o_on.subscribe(200, 373)
927                 });
928                 auto actual = xs.subscriptions();
929                 REQUIRE(required == actual);
930             }
931         }
932     }
933 }
934 
935 SCENARIO("window with time or count, only time triggered", "[window_with_time_or_count][operators]"){
936     GIVEN("1 hot observable of ints."){
937         auto sc = rxsc::make_test();
938         auto so = rx::synchronize_in_one_worker(sc);
939         auto w = sc.create_worker();
940         const rxsc::test::messages<int> on;
941         const rxsc::test::messages<rx::observable<int>> o_on;
942 
943         auto xs = sc.make_hot_observable({
944             on.next(205, 1),
945             on.next(305, 2),
946             on.next(505, 3),
947             on.next(605, 4),
948             on.next(610, 5),
949             on.completed(850)
950         });
951 
952         WHEN("group each int with the next 2 ints"){
953             using namespace std::chrono;
954 
955             auto res = w.start(
__anon38a1318b1102() 956                 [&]() {
957                     return xs
958                         .window_with_time_or_count(milliseconds(100), 3, so)
959                         .merge()
960                         // forget type to workaround lambda deduction bug on msvc 2013
961                         .as_dynamic();
962                 }
963             );
964 
965             THEN("the output contains merged groups of ints"){
966                 auto required = rxu::to_vector({
967                     on.next(206, 1),
968                     on.next(306, 2),
969                     on.next(506, 3),
970                     on.next(606, 4),
971                     on.next(611, 5),
972                     on.completed(851)
973                 });
974                 auto actual = res.get_observer().messages();
975                 REQUIRE(required == actual);
976             }
977 
978             THEN("there was one subscription and one unsubscription to the observable"){
979                 auto required = rxu::to_vector({
980                     o_on.subscribe(200, 850)
981                 });
982                 auto actual = xs.subscriptions();
983                 REQUIRE(required == actual);
984             }
985         }
986     }
987 }
988 
989 SCENARIO("window with time or count, only count triggered", "[window_with_time_or_count][operators]"){
990     GIVEN("1 hot observable of ints."){
991         auto sc = rxsc::make_test();
992         auto so = rx::synchronize_in_one_worker(sc);
993         auto w = sc.create_worker();
994         const rxsc::test::messages<int> on;
995         const rxsc::test::messages<rx::observable<int>> o_on;
996 
997         auto xs = sc.make_hot_observable({
998             on.next(205, 1),
999             on.next(305, 2),
1000             on.next(505, 3),
1001             on.next(605, 4),
1002             on.next(610, 5),
1003             on.completed(850)
1004         });
1005 
1006         WHEN("group each int with the next 2 ints"){
1007             using namespace std::chrono;
1008 
1009             auto res = w.start(
__anon38a1318b1202() 1010                 [&]() {
1011                     return xs
1012                         .window_with_time_or_count(milliseconds(370), 2, so)
1013                         .map([](rx::observable<int> w){
1014                             return w.count();
1015                         })
1016                         .merge()
1017                         // forget type to workaround lambda deduction bug on msvc 2013
1018                         .as_dynamic();
1019                 }
1020             );
1021 
1022             THEN("the output contains merged groups of ints"){
1023                 auto required = rxu::to_vector({
1024                     on.next(306, 2),
1025                     on.next(606, 2),
1026                     on.next(851, 1),
1027                     on.completed(851)
1028                 });
1029                 auto actual = res.get_observer().messages();
1030                 REQUIRE(required == actual);
1031             }
1032 
1033             THEN("there was one subscription and one unsubscription to the observable"){
1034                 auto required = rxu::to_vector({
1035                     o_on.subscribe(200, 850)
1036                 });
1037                 auto actual = xs.subscriptions();
1038                 REQUIRE(required == actual);
1039             }
1040         }
1041     }
1042 }
1043