1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_OBSERVABLE_HPP)
6 #define RXCPP_RX_OBSERVABLE_HPP
7
8 #include "rx-includes.hpp"
9
10 #ifdef __GNUG__
11 #define EXPLICIT_THIS this->
12 #else
13 #define EXPLICIT_THIS
14 #endif
15
16 namespace rxcpp {
17
18 namespace detail {
19
20 template<class Subscriber, class T>
21 struct has_on_subscribe_for
22 {
23 struct not_void {};
24 template<class CS, class CT>
25 static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
26 template<class CS, class CT>
27 static not_void check(...);
28
29 typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
30 static const bool value = std::is_same<detail_result, void>::value;
31 };
32
33 }
34
35 template<class T>
36 class dynamic_observable
37 : public rxs::source_base<T>
38 {
39 struct state_type
40 : public std::enable_shared_from_this<state_type>
41 {
42 typedef std::function<void(subscriber<T>)> onsubscribe_type;
43
44 onsubscribe_type on_subscribe;
45 };
46 std::shared_ptr<state_type> state;
47
48 template<class U>
49 friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
50
51 template<class SO>
construct(SO && source,rxs::tag_source &&)52 void construct(SO&& source, rxs::tag_source&&) {
53 rxu::decay_t<SO> so = std::forward<SO>(source);
54 state->on_subscribe = [so](subscriber<T> o) mutable {
55 so.on_subscribe(std::move(o));
56 };
57 }
58
59 struct tag_function {};
60 template<class F>
construct(F && f,tag_function &&)61 void construct(F&& f, tag_function&&) {
62 state->on_subscribe = std::forward<F>(f);
63 }
64
65 public:
66
67 typedef tag_dynamic_observable dynamic_observable_tag;
68
dynamic_observable()69 dynamic_observable()
70 {
71 }
72
73 template<class SOF>
dynamic_observable(SOF && sof,typename std::enable_if<!is_dynamic_observable<SOF>::value,void ** >::type=0)74 explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
75 : state(std::make_shared<state_type>())
76 {
77 construct(std::forward<SOF>(sof),
78 typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
79 }
80
on_subscribe(subscriber<T> o) const81 void on_subscribe(subscriber<T> o) const {
82 state->on_subscribe(std::move(o));
83 }
84
85 template<class Subscriber>
86 typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
on_subscribe(Subscriber o) const87 on_subscribe(Subscriber o) const {
88 state->on_subscribe(o.as_dynamic());
89 }
90 };
91
92 template<class T>
operator ==(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)93 inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
94 return lhs.state == rhs.state;
95 }
96 template<class T>
operator !=(const dynamic_observable<T> & lhs,const dynamic_observable<T> & rhs)97 inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
98 return !(lhs == rhs);
99 }
100
101 template<class T, class Source>
make_observable_dynamic(Source && s)102 observable<T> make_observable_dynamic(Source&& s) {
103 return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
104 }
105
106 namespace detail {
107 template<bool Selector, class Default, class SO>
108 struct resolve_observable;
109
110 template<class Default, class SO>
111 struct resolve_observable<true, Default, SO>
112 {
113 typedef typename SO::type type;
114 typedef typename type::value_type value_type;
115 static const bool value = true;
116 typedef observable<value_type, type> observable_type;
117 template<class... AN>
makerxcpp::detail::resolve_observable118 static observable_type make(const Default&, AN&&... an) {
119 return observable_type(type(std::forward<AN>(an)...));
120 }
121 };
122 template<class Default, class SO>
123 struct resolve_observable<false, Default, SO>
124 {
125 static const bool value = false;
126 typedef Default observable_type;
127 template<class... AN>
makerxcpp::detail::resolve_observable128 static observable_type make(const observable_type& that, const AN&...) {
129 return that;
130 }
131 };
132 template<class SO>
133 struct resolve_observable<true, void, SO>
134 {
135 typedef typename SO::type type;
136 typedef typename type::value_type value_type;
137 static const bool value = true;
138 typedef observable<value_type, type> observable_type;
139 template<class... AN>
makerxcpp::detail::resolve_observable140 static observable_type make(AN&&... an) {
141 return observable_type(type(std::forward<AN>(an)...));
142 }
143 };
144 template<class SO>
145 struct resolve_observable<false, void, SO>
146 {
147 static const bool value = false;
148 typedef void observable_type;
149 template<class... AN>
makerxcpp::detail::resolve_observable150 static observable_type make(const AN&...) {
151 }
152 };
153
154 }
155
156 template<class Selector, class Default, template<class... TN> class SO, class... AN>
157 struct defer_observable
158 : public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
159 {
160 };
161
162 /*!
163 \brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.
164
165 \ingroup group-observable
166
167 */
168 template<class T, class Observable>
169 class blocking_observable
170 {
171 template<class Obsvbl, class... ArgN>
blocking_subscribe(const Obsvbl & source,bool do_rethrow,ArgN &&...an)172 static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
173 -> void {
174 std::mutex lock;
175 std::condition_variable wake;
176 bool disposed = false;
177
178 auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
179
180 rxu::error_ptr error;
181 bool has_error = false;
182
183 // keep any error to rethrow at the end.
184 // copy 'dest' by-value to avoid using it after it goes out of scope.
185 auto scbr = make_subscriber<T>(
186 dest,
187 [dest](T t){dest.on_next(t);},
188 [dest,&error,&has_error,do_rethrow](rxu::error_ptr e){
189 if (do_rethrow) {
190 has_error = true;
191 error = e;
192 } else {
193 dest.on_error(e);
194 }
195 },
196 [dest](){dest.on_completed();}
197 );
198
199 auto cs = scbr.get_subscription();
200 cs.add(
201 [&](){
202 std::unique_lock<std::mutex> guard(lock);
203 wake.notify_one();
204 disposed = true;
205 });
206
207 source.subscribe(std::move(scbr));
208
209 std::unique_lock<std::mutex> guard(lock);
210 wake.wait(guard,
211 [&](){
212 return disposed;
213 });
214
215 if (has_error) {rxu::rethrow_exception(error);}
216 }
217
218 public:
219 typedef rxu::decay_t<Observable> observable_type;
220 observable_type source;
~blocking_observable()221 ~blocking_observable()
222 {
223 }
blocking_observable(observable_type s)224 blocking_observable(observable_type s) : source(std::move(s)) {}
225
226 ///
227 /// `subscribe` will cause this observable to emit values to the provided subscriber.
228 ///
229 /// \return void
230 ///
231 /// \param an... - the arguments are passed to make_subscriber().
232 ///
233 /// callers must provide enough arguments to make a subscriber.
234 /// overrides are supported. thus
235 /// `subscribe(thesubscriber, composite_subscription())`
236 /// will take `thesubscriber.get_observer()` and the provided
237 /// subscription and subscribe to the new subscriber.
238 /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
239 /// if a subscription or subscriber is not provided then a new subscription will be created.
240 ///
241 template<class... ArgN>
subscribe(ArgN &&...an) const242 auto subscribe(ArgN&&... an) const
243 -> void {
244 return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
245 }
246
247 ///
248 /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
249 ///
250 /// \note If the source observable calls on_error, the raised exception is rethrown by this method.
251 ///
252 /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
253 ///
254 /// \return void
255 ///
256 /// \param an... - the arguments are passed to make_subscriber().
257 ///
258 /// callers must provide enough arguments to make a subscriber.
259 /// overrides are supported. thus
260 /// `subscribe(thesubscriber, composite_subscription())`
261 /// will take `thesubscriber.get_observer()` and the provided
262 /// subscription and subscribe to the new subscriber.
263 /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
264 /// if a subscription or subscriber is not provided then a new subscription will be created.
265 ///
266 template<class... ArgN>
subscribe_with_rethrow(ArgN &&...an) const267 auto subscribe_with_rethrow(ArgN&&... an) const
268 -> void {
269 return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
270 }
271
272 /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
273
274 \return The first item emitted by this blocking_observable.
275
276 \note If the source observable calls on_error, the raised exception is rethrown by this method.
277
278 \sample
279 When the source observable emits at least one item:
280 \snippet blocking_observable.cpp blocking first sample
281 \snippet output.txt blocking first sample
282
283 When the source observable is empty:
284 \snippet blocking_observable.cpp blocking first empty sample
285 \snippet output.txt blocking first empty sample
286 */
287 template<class... AN>
first(AN ** ...)288 auto first(AN**...) -> delayed_type_t<T, AN...> const {
289 rxu::maybe<T> result;
290 composite_subscription cs;
291 subscribe_with_rethrow(
292 cs,
293 [&](T v){result.reset(v); cs.unsubscribe();});
294 if (result.empty())
295 rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
296 return result.get();
297 static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
298 }
299
300 /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
301
302 \return The last item emitted by this blocking_observable.
303
304 \note If the source observable calls on_error, the raised exception is rethrown by this method.
305
306 \sample
307 When the source observable emits at least one item:
308 \snippet blocking_observable.cpp blocking last sample
309 \snippet output.txt blocking last sample
310
311 When the source observable is empty:
312 \snippet blocking_observable.cpp blocking last empty sample
313 \snippet output.txt blocking last empty sample
314 */
315 template<class... AN>
last(AN ** ...)316 auto last(AN**...) -> delayed_type_t<T, AN...> const {
317 rxu::maybe<T> result;
318 subscribe_with_rethrow(
319 [&](T v){result.reset(v);});
320 if (result.empty())
321 rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
322 return result.get();
323 static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
324 }
325
326 /*! Return the total number of items emitted by this blocking_observable.
327
328 \return The total number of items emitted by this blocking_observable.
329
330 \sample
331 \snippet blocking_observable.cpp blocking count sample
332 \snippet output.txt blocking count sample
333
334 When the source observable calls on_error:
335 \snippet blocking_observable.cpp blocking count error sample
336 \snippet output.txt blocking count error sample
337 */
count() const338 int count() const {
339 int result = 0;
340 source.count().as_blocking().subscribe_with_rethrow(
341 [&](int v){result = v;});
342 return result;
343 }
344
345 /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
346
347 \return The sum of all items emitted by this blocking_observable.
348
349 \sample
350 When the source observable emits at least one item:
351 \snippet blocking_observable.cpp blocking sum sample
352 \snippet output.txt blocking sum sample
353
354 When the source observable is empty:
355 \snippet blocking_observable.cpp blocking sum empty sample
356 \snippet output.txt blocking sum empty sample
357
358 When the source observable calls on_error:
359 \snippet blocking_observable.cpp blocking sum error sample
360 \snippet output.txt blocking sum error sample
361 */
sum() const362 T sum() const {
363 return source.sum().as_blocking().last();
364 }
365
366 /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
367
368 \return The average value of all items emitted by this blocking_observable.
369
370 \sample
371 When the source observable emits at least one item:
372 \snippet blocking_observable.cpp blocking average sample
373 \snippet output.txt blocking average sample
374
375 When the source observable is empty:
376 \snippet blocking_observable.cpp blocking average empty sample
377 \snippet output.txt blocking average empty sample
378
379 When the source observable calls on_error:
380 \snippet blocking_observable.cpp blocking average error sample
381 \snippet output.txt blocking average error sample
382 */
average() const383 double average() const {
384 return source.average().as_blocking().last();
385 }
386
387 /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
388
389 \return The max of all items emitted by this blocking_observable.
390
391 \sample
392 When the source observable emits at least one item:
393 \snippet blocking_observable.cpp blocking max sample
394 \snippet output.txt blocking max sample
395
396 When the source observable is empty:
397 \snippet blocking_observable.cpp blocking max empty sample
398 \snippet output.txt blocking max empty sample
399
400 When the source observable calls on_error:
401 \snippet blocking_observable.cpp blocking max error sample
402 \snippet output.txt blocking max error sample
403 */
max() const404 T max() const {
405 return source.max().as_blocking().last();
406 }
407
408 /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
409
410 \return The min of all items emitted by this blocking_observable.
411
412 \sample
413 When the source observable emits at least one item:
414 \snippet blocking_observable.cpp blocking min sample
415 \snippet output.txt blocking min sample
416
417 When the source observable is empty:
418 \snippet blocking_observable.cpp blocking min empty sample
419 \snippet output.txt blocking min empty sample
420
421 When the source observable calls on_error:
422 \snippet blocking_observable.cpp blocking min error sample
423 \snippet output.txt blocking min error sample
424 */
min() const425 T min() const {
426 return source.min().as_blocking().last();
427 }
428 };
429
430 namespace detail {
431
432 template<class SourceOperator, class Subscriber>
433 struct safe_subscriber
434 {
safe_subscriberrxcpp::detail::safe_subscriber435 safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
436
subscriberxcpp::detail::safe_subscriber437 void subscribe() {
438 RXCPP_TRY {
439 so->on_subscribe(*o);
440 } RXCPP_CATCH(...) {
441 if (!o->is_subscribed()) {
442 rxu::rethrow_current_exception();
443 }
444 o->on_error(rxu::make_error_ptr(rxu::current_exception()));
445 o->unsubscribe();
446 }
447 }
448
operator ()rxcpp::detail::safe_subscriber449 void operator()(const rxsc::schedulable&) {
450 subscribe();
451 }
452
453 SourceOperator* so;
454 Subscriber* o;
455 };
456
457 }
458
459 template<>
460 class observable<void, void>;
461
462 /*!
463 \defgroup group-observable Observables
464
465 \brief These are the set of observable classes in rxcpp.
466
467 \class rxcpp::observable
468
469 \ingroup group-observable group-core
470
471 \brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.
472
473 \par Some code
474 This sample will observable::subscribe() to values from a observable<void, void>::range().
475
476 \sample
477 \snippet range.cpp range sample
478 \snippet output.txt range sample
479
480 */
481 template<class T, class SourceOperator>
482 class observable
483 : public observable_base<T>
484 {
485 static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
486
487 typedef observable<T, SourceOperator> this_type;
488
489 public:
490 typedef rxu::decay_t<SourceOperator> source_operator_type;
491 mutable source_operator_type source_operator;
492
493 private:
494
495 template<class U, class SO>
496 friend class observable;
497
498 template<class U, class SO>
499 friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
500
501 template<class Subscriber>
detail_subscribe(Subscriber o) const502 auto detail_subscribe(Subscriber o) const
503 -> composite_subscription {
504
505 typedef rxu::decay_t<Subscriber> subscriber_type;
506
507 static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
508 static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
509 static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
510
511 trace_activity().subscribe_enter(*this, o);
512
513 if (!o.is_subscribed()) {
514 trace_activity().subscribe_return(*this);
515 return o.get_subscription();
516 }
517
518 detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
519
520 // make sure to let current_thread take ownership of the thread as early as possible.
521 if (rxsc::current_thread::is_schedule_required()) {
522 const auto& sc = rxsc::make_current_thread();
523 sc.create_worker(o.get_subscription()).schedule(subscriber);
524 } else {
525 // current_thread already owns this thread.
526 subscriber.subscribe();
527 }
528
529 trace_activity().subscribe_return(*this);
530 return o.get_subscription();
531 }
532
533 public:
534 typedef T value_type;
535
536 static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
537
~observable()538 ~observable()
539 {
540 }
541
observable()542 observable()
543 {
544 }
545
observable(const source_operator_type & o)546 explicit observable(const source_operator_type& o)
547 : source_operator(o)
548 {
549 }
observable(source_operator_type && o)550 explicit observable(source_operator_type&& o)
551 : source_operator(std::move(o))
552 {
553 }
554
555 /// implicit conversion between observables of the same value_type
556 template<class SO>
observable(const observable<T,SO> & o)557 observable(const observable<T, SO>& o)
558 : source_operator(o.source_operator)
559 {}
560 /// implicit conversion between observables of the same value_type
561 template<class SO>
observable(observable<T,SO> && o)562 observable(observable<T, SO>&& o)
563 : source_operator(std::move(o.source_operator))
564 {}
565
566 #if 0
567 template<class I>
568 void on_subscribe(observer<T, I> o) const {
569 source_operator.on_subscribe(o);
570 }
571 #endif
572
573 /*! @copydoc rxcpp::operators::as_dynamic
574 */
575 template<class... AN>
as_dynamic(AN ** ...) const576 observable<T> as_dynamic(AN**...) const {
577 return *this;
578 static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
579 }
580
581 /*! @copydoc rx-ref_count.hpp
582 */
583 template<class... AN>
ref_count(AN...an) const584 auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
585 /// \cond SHOW_SERVICE_MEMBERS
586 -> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
587 /// \endcond
588 {
589 return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
590 }
591
592 /*! @copydoc rxcpp::operators::as_blocking
593 */
594 template<class... AN>
as_blocking(AN ** ...) const595 blocking_observable<T, this_type> as_blocking(AN**...) const {
596 return blocking_observable<T, this_type>(*this);
597 static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
598 }
599
600 /// \cond SHOW_SERVICE_MEMBERS
601
602 ///
603 /// takes any function that will take this observable and produce a result value.
604 /// this is intended to allow externally defined operators, that use subscribe,
605 /// to be connected into the expression.
606 ///
607 template<class OperatorFactory>
op(OperatorFactory && of) const608 auto op(OperatorFactory&& of) const
609 -> decltype(of(*(const this_type*)nullptr)) {
610 return of(*this);
611 static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
612 }
613
614 /*! @copydoc rx-lift.hpp
615 */
616 template<class ResultType, class Operator>
lift(Operator && op) const617 auto lift(Operator&& op) const
618 -> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
619 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
620 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
621 static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
622 }
623
624 ///
625 /// takes any function that will take a subscriber for this observable and produce a subscriber.
626 /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
627 /// into the expression.
628 ///
629 template<class ResultType, class Operator>
lift_if(Operator && op) const630 auto lift_if(Operator&& op) const
631 -> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
632 observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
633 return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
634 rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
635 }
636 ///
637 /// takes any function that will take a subscriber for this observable and produce a subscriber.
638 /// this is intended to allow externally defined operators, that use make_subscriber, to be connected
639 /// into the expression.
640 ///
641 template<class ResultType, class Operator>
lift_if(Operator &&) const642 auto lift_if(Operator&&) const
643 -> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
644 decltype(rxs::from<ResultType>())>::type {
645 return rxs::from<ResultType>();
646 }
647 /// \endcond
648
649 /*! @copydoc rx-subscribe.hpp
650 */
651 template<class... ArgN>
subscribe(ArgN &&...an) const652 auto subscribe(ArgN&&... an) const
653 -> composite_subscription {
654 return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
655 }
656
657 /*! @copydoc rx-all.hpp
658 */
659 template<class... AN>
all(AN &&...an) const660 auto all(AN&&... an) const
661 /// \cond SHOW_SERVICE_MEMBERS
662 -> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
663 /// \endcond
664 {
665 return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
666 }
667
668 /*! @copydoc rxcpp::operators::is_empty
669 */
670 template<class... AN>
is_empty(AN &&...an) const671 auto is_empty(AN&&... an) const
672 /// \cond SHOW_SERVICE_MEMBERS
673 -> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
674 /// \endcond
675 {
676 return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
677 }
678
679 /*! @copydoc rx-any.hpp
680 */
681 template<class... AN>
any(AN &&...an) const682 auto any(AN&&... an) const
683 /// \cond SHOW_SERVICE_MEMBERS
684 -> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
685 /// \endcond
686 {
687 return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
688 }
689
690 /*! @copydoc rxcpp::operators::exists
691 */
692 template<class... AN>
exists(AN &&...an) const693 auto exists(AN&&... an) const
694 /// \cond SHOW_SERVICE_MEMBERS
695 -> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
696 /// \endcond
697 {
698 return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
699 }
700
701 /*! @copydoc rxcpp::operators::contains
702 */
703 template<class... AN>
contains(AN &&...an) const704 auto contains(AN&&... an) const
705 /// \cond SHOW_SERVICE_MEMBERS
706 -> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
707 /// \endcond
708 {
709 return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
710 }
711
712 /*! @copydoc rx-filter.hpp
713 */
714 template<class... AN>
filter(AN &&...an) const715 auto filter(AN&&... an) const
716 /// \cond SHOW_SERVICE_MEMBERS
717 -> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
718 /// \endcond
719 {
720 return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
721 }
722
723 /*! @copydoc rx-switch_if_empty.hpp
724 */
725 template<class... AN>
switch_if_empty(AN &&...an) const726 auto switch_if_empty(AN&&... an) const
727 /// \cond SHOW_SERVICE_MEMBERS
728 -> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
729 /// \endcond
730 {
731 return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
732 }
733
734 /*! @copydoc rxcpp::operators::default_if_empty
735 */
736 template<class... AN>
default_if_empty(AN &&...an) const737 auto default_if_empty(AN&&... an) const
738 /// \cond SHOW_SERVICE_MEMBERS
739 -> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
740 /// \endcond
741 {
742 return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
743 }
744
745 /*! @copydoc rx-sequence_equal.hpp
746 */
747 template<class... AN>
sequence_equal(AN...an) const748 auto sequence_equal(AN... an) const
749 /// \cond SHOW_SERVICE_MEMBERS
750 -> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
751 /// \endcond
752 {
753 return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
754 }
755
756 /*! @copydoc rx-tap.hpp
757 */
758 template<class... AN>
tap(AN &&...an) const759 auto tap(AN&&... an) const
760 /// \cond SHOW_SERVICE_MEMBERS
761 -> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
762 /// \endcond
763 {
764 return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
765 }
766
767 /*! @copydoc rx-time_interval.hpp
768 */
769 template<class... AN>
time_interval(AN &&...an) const770 auto time_interval(AN&&... an) const
771 /// \cond SHOW_SERVICE_MEMBERS
772 -> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
773 /// \endcond
774 {
775 return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
776 }
777
778 /*! @copydoc rx-timeout.hpp
779 */
780 template<class... AN>
timeout(AN &&...an) const781 auto timeout(AN&&... an) const
782 /// \cond SHOW_SERVICE_MEMBERS
783 -> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
784 /// \endcond
785 {
786 return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
787 }
788
789 /*! @copydoc rx-timestamp.hpp
790 */
791 template<class... AN>
timestamp(AN &&...an) const792 auto timestamp(AN&&... an) const
793 /// \cond SHOW_SERVICE_MEMBERS
794 -> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
795 /// \endcond
796 {
797 return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
798 }
799
800 /*! @copydoc rx-finally.hpp
801 */
802 template<class... AN>
finally(AN &&...an) const803 auto finally(AN&&... an) const
804 /// \cond SHOW_SERVICE_MEMBERS
805 -> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
806 /// \endcond
807 {
808 return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
809 }
810
811 /*! @copydoc rx-on_error_resume_next.hpp
812 */
813 template<class... AN>
on_error_resume_next(AN &&...an) const814 auto on_error_resume_next(AN&&... an) const
815 /// \cond SHOW_SERVICE_MEMBERS
816 -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
817 /// \endcond
818 {
819 return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
820 }
821
822 /*! @copydoc rx-on_error_resume_next.hpp
823 */
824 template<class... AN>
switch_on_error(AN &&...an) const825 auto switch_on_error(AN&&... an) const
826 /// \cond SHOW_SERVICE_MEMBERS
827 -> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
828 /// \endcond
829 {
830 return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
831 }
832
833 /*! @copydoc rx-map.hpp
834 */
835 template<class... AN>
map(AN &&...an) const836 auto map(AN&&... an) const
837 /// \cond SHOW_SERVICE_MEMBERS
838 -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
839 /// \endcond
840 {
841 return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
842 }
843
844 /*! @copydoc rx-map.hpp
845 */
846 template<class... AN>
transform(AN &&...an) const847 auto transform(AN&&... an) const
848 /// \cond SHOW_SERVICE_MEMBERS
849 -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
850 /// \endcond
851 {
852 return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
853 }
854
855 /*! @copydoc rx-debounce.hpp
856 */
857 template<class... AN>
debounce(AN &&...an) const858 auto debounce(AN&&... an) const
859 /// \cond SHOW_SERVICE_MEMBERS
860 -> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
861 /// \endcond
862 {
863 return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
864 }
865
866 /*! @copydoc rx-delay.hpp
867 */
868 template<class... AN>
delay(AN &&...an) const869 auto delay(AN&&... an) const
870 /// \cond SHOW_SERVICE_MEMBERS
871 -> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
872 /// \endcond
873 {
874 return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
875 }
876
877 /*! @copydoc rx-distinct.hpp
878 */
879 template<class... AN>
distinct(AN &&...an) const880 auto distinct(AN&&... an) const
881 /// \cond SHOW_SERVICE_MEMBERS
882 -> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
883 /// \endcond
884 {
885 return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
886 }
887
888 /*! @copydoc rx-distinct_until_changed.hpp
889 */
890 template<class... AN>
distinct_until_changed(AN &&...an) const891 auto distinct_until_changed(AN&&... an) const
892 /// \cond SHOW_SERVICE_MEMBERS
893 -> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
894 /// \endcond
895 {
896 return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
897 }
898
899 /*! @copydoc rx-element_at.hpp
900 */
901 template<class... AN>
element_at(AN &&...an) const902 auto element_at(AN&&... an) const
903 /// \cond SHOW_SERVICE_MEMBERS
904 -> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
905 /// \endcond
906 {
907 return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
908 }
909
910 /*! @copydoc rx-window.hpp
911 */
912 template<class... AN>
window(AN &&...an) const913 auto window(AN&&... an) const
914 /// \cond SHOW_SERVICE_MEMBERS
915 -> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
916 /// \endcond
917 {
918 return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
919 }
920
921 /*! @copydoc rx-window_time.hpp
922 */
923 template<class... AN>
window_with_time(AN &&...an) const924 auto window_with_time(AN&&... an) const
925 /// \cond SHOW_SERVICE_MEMBERS
926 -> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
927 /// \endcond
928 {
929 return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
930 }
931
932 /*! @copydoc rx-window_time_count.hpp
933 */
934 template<class... AN>
window_with_time_or_count(AN &&...an) const935 auto window_with_time_or_count(AN&&... an) const
936 /// \cond SHOW_SERVICE_MEMBERS
937 -> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
938 /// \endcond
939 {
940 return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
941 }
942
943 /*! @copydoc rx-window_toggle.hpp
944 */
945 template<class... AN>
window_toggle(AN &&...an) const946 auto window_toggle(AN&&... an) const
947 /// \cond SHOW_SERVICE_MEMBERS
948 -> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
949 /// \endcond
950 {
951 return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
952 }
953
954 /*! @copydoc rx-buffer_count.hpp
955 */
956 template<class... AN>
buffer(AN &&...an) const957 auto buffer(AN&&... an) const
958 /// \cond SHOW_SERVICE_MEMBERS
959 -> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
960 /// \endcond
961 {
962 return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
963 }
964
965 /*! @copydoc rx-buffer_time.hpp
966 */
967 template<class... AN>
buffer_with_time(AN &&...an) const968 auto buffer_with_time(AN&&... an) const
969 /// \cond SHOW_SERVICE_MEMBERS
970 -> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
971 /// \endcond
972 {
973 return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
974 }
975
976 /*! @copydoc rx-buffer_time_count.hpp
977 */
978 template<class... AN>
buffer_with_time_or_count(AN &&...an) const979 auto buffer_with_time_or_count(AN&&... an) const
980 /// \cond SHOW_SERVICE_MEMBERS
981 -> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
982 /// \endcond
983 {
984 return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
985 }
986
987 /*! @copydoc rx-switch_on_next.hpp
988 */
989 template<class... AN>
switch_on_next(AN &&...an) const990 auto switch_on_next(AN&&... an) const
991 /// \cond SHOW_SERVICE_MEMBERS
992 -> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
993 /// \endcond
994 {
995 return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
996 }
997
998 /*! @copydoc rx-merge.hpp
999 */
1000 template<class... AN>
merge(AN...an) const1001 auto merge(AN... an) const
1002 /// \cond SHOW_SERVICE_MEMBERS
1003 -> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1004 /// \endcond
1005 {
1006 return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
1007 }
1008
1009 /*! @copydoc rx-merge_delay_error.hpp
1010 */
1011 template<class... AN>
merge_delay_error(AN...an) const1012 auto merge_delay_error(AN... an) const
1013 /// \cond SHOW_SERVICE_MEMBERS
1014 -> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1015 /// \endcond
1016 {
1017 return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
1018 }
1019
1020 /*! @copydoc rx-amb.hpp
1021 */
1022 template<class... AN>
amb(AN...an) const1023 auto amb(AN... an) const
1024 /// \cond SHOW_SERVICE_MEMBERS
1025 -> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1026 /// \endcond
1027 {
1028 return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
1029 }
1030
1031 /*! @copydoc rx-flat_map.hpp
1032 */
1033 template<class... AN>
flat_map(AN &&...an) const1034 auto flat_map(AN&&... an) const
1035 /// \cond SHOW_SERVICE_MEMBERS
1036 -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1037 /// \endcond
1038 {
1039 return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1040 }
1041
1042 /*! @copydoc rx-flat_map.hpp
1043 */
1044 template<class... AN>
merge_transform(AN &&...an) const1045 auto merge_transform(AN&&... an) const
1046 /// \cond SHOW_SERVICE_MEMBERS
1047 -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1048 /// \endcond
1049 {
1050 return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
1051 }
1052
1053 /*! @copydoc rx-concat.hpp
1054 */
1055 template<class... AN>
concat(AN...an) const1056 auto concat(AN... an) const
1057 /// \cond SHOW_SERVICE_MEMBERS
1058 -> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1059 /// \endcond
1060 {
1061 return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
1062 }
1063
1064 /*! @copydoc rx-concat_map.hpp
1065 */
1066 template<class... AN>
concat_map(AN &&...an) const1067 auto concat_map(AN&&... an) const
1068 /// \cond SHOW_SERVICE_MEMBERS
1069 -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1070 /// \endcond
1071 {
1072 return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1073 }
1074
1075 /*! @copydoc rx-concat_map.hpp
1076 */
1077 template<class... AN>
concat_transform(AN &&...an) const1078 auto concat_transform(AN&&... an) const
1079 /// \cond SHOW_SERVICE_MEMBERS
1080 -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1081 /// \endcond
1082 {
1083 return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
1084 }
1085
1086 /*! @copydoc rx-with_latest_from.hpp
1087 */
1088 template<class... AN>
with_latest_from(AN...an) const1089 auto with_latest_from(AN... an) const
1090 /// \cond SHOW_SERVICE_MEMBERS
1091 -> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1092 /// \endcond
1093 {
1094 return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
1095 }
1096
1097
1098 /*! @copydoc rx-combine_latest.hpp
1099 */
1100 template<class... AN>
combine_latest(AN...an) const1101 auto combine_latest(AN... an) const
1102 /// \cond SHOW_SERVICE_MEMBERS
1103 -> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1104 /// \endcond
1105 {
1106 return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
1107 }
1108
1109 /*! @copydoc rx-zip.hpp
1110 */
1111 template<class... AN>
zip(AN &&...an) const1112 auto zip(AN&&... an) const
1113 /// \cond SHOW_SERVICE_MEMBERS
1114 -> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1115 /// \endcond
1116 {
1117 return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
1118 }
1119
1120 /*! @copydoc rx-group_by.hpp
1121 */
1122 template<class... AN>
group_by(AN &&...an) const1123 inline auto group_by(AN&&... an) const
1124 /// \cond SHOW_SERVICE_MEMBERS
1125 -> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1126 /// \endcond
1127 {
1128 return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
1129 }
1130
1131 /*! @copydoc rx-ignore_elements.hpp
1132 */
1133 template<class... AN>
ignore_elements(AN &&...an) const1134 auto ignore_elements(AN&&... an) const
1135 /// \cond SHOW_SERVICE_MEMBERS
1136 -> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1137 /// \endcond
1138 {
1139 return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
1140 }
1141
1142 /*! @copydoc rx-muticast.hpp
1143 */
1144 template<class... AN>
multicast(AN &&...an) const1145 auto multicast(AN&&... an) const
1146 /// \cond SHOW_SERVICE_MEMBERS
1147 -> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1148 /// \endcond
1149 {
1150 return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
1151 }
1152
1153 /*! @copydoc rx-publish.hpp
1154 */
1155 template<class... AN>
publish(AN &&...an) const1156 auto publish(AN&&... an) const
1157 /// \cond SHOW_SERVICE_MEMBERS
1158 -> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1159 /// \endcond
1160 {
1161 return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
1162 }
1163
1164 /*! @copydoc rxcpp::operators::publish_synchronized
1165 */
1166 template<class... AN>
publish_synchronized(AN &&...an) const1167 auto publish_synchronized(AN&&... an) const
1168 /// \cond SHOW_SERVICE_MEMBERS
1169 -> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1170 /// \endcond
1171 {
1172 return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
1173 }
1174
1175 /*! @copydoc rx-replay.hpp
1176 */
1177 template<class... AN>
replay(AN &&...an) const1178 auto replay(AN&&... an) const
1179 /// \cond SHOW_SERVICE_MEMBERS
1180 -> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1181 /// \endcond
1182 {
1183 return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
1184 }
1185
1186 /*! @copydoc rx-subscribe_on.hpp
1187 */
1188 template<class... AN>
subscribe_on(AN &&...an) const1189 auto subscribe_on(AN&&... an) const
1190 /// \cond SHOW_SERVICE_MEMBERS
1191 -> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1192 /// \endcond
1193 {
1194 return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
1195 }
1196
1197 /*! @copydoc rx-observe_on.hpp
1198 */
1199 template<class... AN>
observe_on(AN &&...an) const1200 auto observe_on(AN&&... an) const
1201 /// \cond SHOW_SERVICE_MEMBERS
1202 -> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1203 /// \endcond
1204 {
1205 return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
1206 }
1207
1208 /*! @copydoc rx-reduce.hpp
1209 */
1210 template<class... AN>
reduce(AN &&...an) const1211 auto reduce(AN&&... an) const
1212 /// \cond SHOW_SERVICE_MEMBERS
1213 -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1214 /// \endcond
1215 {
1216 return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1217 }
1218
1219 /*! @copydoc rx-reduce.hpp
1220 */
1221 template<class... AN>
accumulate(AN &&...an) const1222 auto accumulate(AN&&... an) const
1223 /// \cond SHOW_SERVICE_MEMBERS
1224 -> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1225 /// \endcond
1226 {
1227 return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
1228 }
1229
1230 /*! @copydoc rxcpp::operators::first
1231 */
1232 template<class... AN>
first(AN ** ...) const1233 auto first(AN**...) const
1234 /// \cond SHOW_SERVICE_MEMBERS
1235 -> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
1236 /// \endcond
1237 {
1238 return observable_member(delayed_type<first_tag, AN...>::value(), *this);
1239 static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
1240 }
1241
1242 /*! @copydoc rxcpp::operators::last
1243 */
1244 template<class... AN>
last(AN ** ...) const1245 auto last(AN**...) const
1246 /// \cond SHOW_SERVICE_MEMBERS
1247 -> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
1248 /// \endcond
1249 {
1250 return observable_member(delayed_type<last_tag, AN...>::value(), *this);
1251 static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
1252 }
1253
1254 /*! @copydoc rxcpp::operators::count
1255 */
1256 template<class... AN>
count(AN ** ...) const1257 auto count(AN**...) const
1258 /// \cond SHOW_SERVICE_MEMBERS
1259 -> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
1260 /// \endcond
1261 {
1262 return observable_member(delayed_type<reduce_tag, AN...>::value(), *this, 0, rxu::count(), identity_for<int>());
1263 static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
1264 }
1265
1266 /*! @copydoc rxcpp::operators::sum
1267 */
1268 template<class... AN>
sum(AN ** ...) const1269 auto sum(AN**...) const
1270 /// \cond SHOW_SERVICE_MEMBERS
1271 -> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
1272 /// \endcond
1273 {
1274 return observable_member(delayed_type<sum_tag, AN...>::value(), *this);
1275 static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
1276 }
1277
1278 /*! @copydoc rxcpp::operators::average
1279 */
1280 template<class... AN>
average(AN ** ...) const1281 auto average(AN**...) const
1282 /// \cond SHOW_SERVICE_MEMBERS
1283 -> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
1284 /// \endcond
1285 {
1286 return observable_member(delayed_type<average_tag, AN...>::value(), *this);
1287 static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
1288 }
1289
1290 /*! @copydoc rxcpp::operators::max
1291 */
1292 template<class... AN>
max(AN ** ...) const1293 auto max(AN**...) const
1294 /// \cond SHOW_SERVICE_MEMBERS
1295 -> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
1296 /// \endcond
1297 {
1298 return observable_member(delayed_type<max_tag, AN...>::value(), *this);
1299 static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
1300 }
1301
1302 /*! @copydoc rxcpp::operators::min
1303 */
1304 template<class... AN>
min(AN ** ...) const1305 auto min(AN**...) const
1306 /// \cond SHOW_SERVICE_MEMBERS
1307 -> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
1308 /// \endcond
1309 {
1310 return observable_member(delayed_type<min_tag, AN...>::value(), *this);
1311 static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
1312 }
1313
1314 /*! @copydoc rx-scan.hpp
1315 */
1316 template<class... AN>
scan(AN...an) const1317 auto scan(AN... an) const
1318 /// \cond SHOW_SERVICE_MEMBERS
1319 -> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1320 /// \endcond
1321 {
1322 return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
1323 }
1324
1325 /*! @copydoc rx-sample_time.hpp
1326 */
1327 template<class... AN>
sample_with_time(AN &&...an) const1328 auto sample_with_time(AN&&... an) const
1329 /// \cond SHOW_SERVICE_MEMBERS
1330 -> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1331 /// \endcond
1332 {
1333 return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
1334 }
1335
1336 /*! @copydoc rx-skip.hpp
1337 */
1338 template<class... AN>
skip(AN...an) const1339 auto skip(AN... an) const
1340 /// \cond SHOW_SERVICE_MEMBERS
1341 -> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1342 /// \endcond
1343 {
1344 return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
1345 }
1346
1347 /*! @copydoc rx-skip.hpp
1348 */
1349 template<class... AN>
skip_while(AN...an) const1350 auto skip_while(AN... an) const
1351 /// \cond SHOW_SERVICE_MEMBERS
1352 -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1353 /// \endcond
1354 {
1355 return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
1356 }
1357
1358 /*! @copydoc rx-skip_last.hpp
1359 */
1360 template<class... AN>
skip_last(AN...an) const1361 auto skip_last(AN... an) const
1362 /// \cond SHOW_SERVICE_MEMBERS
1363 -> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1364 /// \endcond
1365 {
1366 return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
1367 }
1368
1369 /*! @copydoc rx-skip_until.hpp
1370 */
1371 template<class... AN>
skip_until(AN...an) const1372 auto skip_until(AN... an) const
1373 /// \cond SHOW_SERVICE_MEMBERS
1374 -> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1375 /// \endcond
1376 {
1377 return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
1378 }
1379
1380 /*! @copydoc rx-take.hpp
1381 */
1382 template<class... AN>
take(AN...an) const1383 auto take(AN... an) const
1384 /// \cond SHOW_SERVICE_MEMBERS
1385 -> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1386 /// \endcond
1387 {
1388 return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
1389 }
1390
1391 /*! @copydoc rx-take_last.hpp
1392 */
1393 template<class... AN>
take_last(AN &&...an) const1394 auto take_last(AN&&... an) const
1395 /// \cond SHOW_SERVICE_MEMBERS
1396 -> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1397 /// \endcond
1398 {
1399 return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
1400 }
1401
1402 /*! @copydoc rx-take_until.hpp
1403 */
1404 template<class... AN>
take_until(AN &&...an) const1405 auto take_until(AN&&... an) const
1406 /// \cond SHOW_SERVICE_MEMBERS
1407 -> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1408 /// \endcond
1409 {
1410 return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
1411 }
1412
1413 /*! @copydoc rx-take_while.hpp
1414 */
1415 template<class... AN>
take_while(AN &&...an) const1416 auto take_while(AN&&... an) const
1417 /// \cond SHOW_SERVICE_MEMBERS
1418 -> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1419 /// \endcond
1420 {
1421 return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
1422 }
1423
1424 /*! @copydoc rx-repeat.hpp
1425 */
1426 template<class... AN>
repeat(AN...an) const1427 auto repeat(AN... an) const
1428 /// \cond SHOW_SERVICE_MEMBERS
1429 -> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1430 /// \endcond
1431 {
1432 return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
1433 }
1434
1435 /*! @copydoc rx-retry.hpp
1436 */
1437 template<class... AN>
retry(AN...an) const1438 auto retry(AN... an) const
1439 /// \cond SHOW_SERVICE_MEMBERS
1440 -> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1441 /// \endcond
1442 {
1443 return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
1444 }
1445
1446 /*! @copydoc rx-start_with.hpp
1447 */
1448 template<class... AN>
start_with(AN...an) const1449 auto start_with(AN... an) const
1450 /// \cond SHOW_SERVICE_MEMBERS
1451 -> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1452 /// \endcond
1453 {
1454 return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
1455 }
1456
1457 /*! @copydoc rx-pairwise.hpp
1458 */
1459 template<class... AN>
pairwise(AN...an) const1460 auto pairwise(AN... an) const
1461 /// \cond SHOW_SERVICE_MEMBERS
1462 -> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
1463 /// \endcond
1464 {
1465 return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
1466 }
1467 };
1468
1469 template<class T, class SourceOperator>
operator ==(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1470 inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1471 return lhs.source_operator == rhs.source_operator;
1472 }
1473 template<class T, class SourceOperator>
operator !=(const observable<T,SourceOperator> & lhs,const observable<T,SourceOperator> & rhs)1474 inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
1475 return !(lhs == rhs);
1476 }
1477
1478 /*!
1479 \defgroup group-core Basics
1480
1481 \brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled.
1482
1483 \class rxcpp::observable<void, void>
1484
1485 \brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable.
1486
1487 \ingroup group-core
1488
1489 \par Create a new type of observable
1490
1491 \sample
1492 \snippet create.cpp Create sample
1493 \snippet output.txt Create sample
1494
1495 \par Create an observable that emits a range of values
1496
1497 \sample
1498 \snippet range.cpp range sample
1499 \snippet output.txt range sample
1500
1501 \par Create an observable that emits nothing / generates an error / immediately completes
1502
1503 \sample
1504 \snippet never.cpp never sample
1505 \snippet output.txt never sample
1506 \snippet error.cpp error sample
1507 \snippet output.txt error sample
1508 \snippet empty.cpp empty sample
1509 \snippet output.txt empty sample
1510
1511 \par Create an observable that generates new observable for each subscriber
1512
1513 \sample
1514 \snippet defer.cpp defer sample
1515 \snippet output.txt defer sample
1516
1517 \par Create an observable that emits items every specified interval of time
1518
1519 \sample
1520 \snippet interval.cpp interval sample
1521 \snippet output.txt interval sample
1522
1523 \par Create an observable that emits items in the specified interval of time
1524
1525 \sample
1526 \snippet timer.cpp duration timer sample
1527 \snippet output.txt duration timer sample
1528
1529 \par Create an observable that emits all items from a collection
1530
1531 \sample
1532 \snippet iterate.cpp iterate sample
1533 \snippet output.txt iterate sample
1534
1535 \par Create an observable that emits a set of specified items
1536
1537 \sample
1538 \snippet from.cpp from sample
1539 \snippet output.txt from sample
1540
1541 \par Create an observable that emits a single item
1542
1543 \sample
1544 \snippet just.cpp just sample
1545 \snippet output.txt just sample
1546
1547 \par Create an observable that emits a set of items and then subscribes to another observable
1548
1549 \sample
1550 \snippet start_with.cpp full start_with sample
1551 \snippet output.txt full start_with sample
1552
1553 \par Create an observable that generates a new observable based on a generated resource for each subscriber
1554
1555 \sample
1556 \snippet scope.cpp scope sample
1557 \snippet output.txt scope sample
1558
1559 */
1560 template<>
1561 class observable<void, void>
1562 {
1563 ~observable();
1564 public:
1565 /*! @copydoc rx-create.hpp
1566 */
1567 template<class T, class OnSubscribe>
create(OnSubscribe os)1568 static auto create(OnSubscribe os)
1569 -> decltype(rxs::create<T>(std::move(os))) {
1570 return rxs::create<T>(std::move(os));
1571 }
1572
1573 /*! @copydoc rx-range.hpp
1574 */
1575 template<class T>
range(T first=0,T last=std::numeric_limits<T>::max (),std::ptrdiff_t step=1)1576 static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
1577 -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
1578 return rxs::range<T>(first, last, step, identity_current_thread());
1579 }
1580 /*! @copydoc rx-range.hpp
1581 */
1582 template<class T, class Coordination>
range(T first,T last,std::ptrdiff_t step,Coordination cn)1583 static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
1584 -> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
1585 return rxs::range<T>(first, last, step, std::move(cn));
1586 }
1587 /*! @copydoc rx-range.hpp
1588 */
1589 template<class T, class Coordination>
range(T first,T last,Coordination cn)1590 static auto range(T first, T last, Coordination cn)
1591 -> decltype(rxs::range<T>(first, last, std::move(cn))) {
1592 return rxs::range<T>(first, last, std::move(cn));
1593 }
1594 /*! @copydoc rx-range.hpp
1595 */
1596 template<class T, class Coordination>
range(T first,Coordination cn)1597 static auto range(T first, Coordination cn)
1598 -> decltype(rxs::range<T>(first, std::move(cn))) {
1599 return rxs::range<T>(first, std::move(cn));
1600 }
1601
1602 /*! @copydoc rx-never.hpp
1603 */
1604 template<class T>
never()1605 static auto never()
1606 -> decltype(rxs::never<T>()) {
1607 return rxs::never<T>();
1608 }
1609
1610 /*! @copydoc rx-defer.hpp
1611 */
1612 template<class ObservableFactory>
defer(ObservableFactory of)1613 static auto defer(ObservableFactory of)
1614 -> decltype(rxs::defer(std::move(of))) {
1615 return rxs::defer(std::move(of));
1616 }
1617
1618 /*! @copydoc rx-interval.hpp
1619 */
1620 template<class... AN>
interval(rxsc::scheduler::clock_type::duration period,AN ** ...)1621 static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
1622 -> decltype(rxs::interval(period)) {
1623 return rxs::interval(period);
1624 static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
1625 }
1626 /*! @copydoc rx-interval.hpp
1627 */
1628 template<class Coordination>
interval(rxsc::scheduler::clock_type::duration period,Coordination cn)1629 static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
1630 -> decltype(rxs::interval(period, std::move(cn))) {
1631 return rxs::interval(period, std::move(cn));
1632 }
1633 /*! @copydoc rx-interval.hpp
1634 */
1635 template<class... AN>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,AN ** ...)1636 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
1637 -> decltype(rxs::interval(initial, period)) {
1638 return rxs::interval(initial, period);
1639 static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
1640 }
1641 /*! @copydoc rx-interval.hpp
1642 */
1643 template<class Coordination>
interval(rxsc::scheduler::clock_type::time_point initial,rxsc::scheduler::clock_type::duration period,Coordination cn)1644 static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
1645 -> decltype(rxs::interval(initial, period, std::move(cn))) {
1646 return rxs::interval(initial, period, std::move(cn));
1647 }
1648
1649 /*! @copydoc rx-timer.hpp
1650 */
1651 template<class... AN>
timer(rxsc::scheduler::clock_type::time_point at,AN ** ...)1652 static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
1653 -> decltype(rxs::timer(at)) {
1654 return rxs::timer(at);
1655 static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
1656 }
1657 /*! @copydoc rx-timer.hpp
1658 */
1659 template<class... AN>
timer(rxsc::scheduler::clock_type::duration after,AN ** ...)1660 static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
1661 -> decltype(rxs::timer(after)) {
1662 return rxs::timer(after);
1663 static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
1664 }
1665 /*! @copydoc rx-timer.hpp
1666 */
1667 template<class Coordination>
timer(rxsc::scheduler::clock_type::time_point when,Coordination cn)1668 static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
1669 -> decltype(rxs::timer(when, std::move(cn))) {
1670 return rxs::timer(when, std::move(cn));
1671 }
1672 /*! @copydoc rx-timer.hpp
1673 */
1674 template<class Coordination>
timer(rxsc::scheduler::clock_type::duration when,Coordination cn)1675 static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
1676 -> decltype(rxs::timer(when, std::move(cn))) {
1677 return rxs::timer(when, std::move(cn));
1678 }
1679
1680 /*! @copydoc rx-iterate.hpp
1681 */
1682 template<class Collection>
iterate(Collection c)1683 static auto iterate(Collection c)
1684 -> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1685 return rxs::iterate(std::move(c), identity_current_thread());
1686 }
1687 /*! @copydoc rx-iterate.hpp
1688 */
1689 template<class Collection, class Coordination>
iterate(Collection c,Coordination cn)1690 static auto iterate(Collection c, Coordination cn)
1691 -> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1692 return rxs::iterate(std::move(c), std::move(cn));
1693 }
1694
1695 /*! @copydoc rxcpp::sources::from()
1696 */
1697 template<class T>
from()1698 static auto from()
1699 -> decltype( rxs::from<T>()) {
1700 return rxs::from<T>();
1701 }
1702 /*! @copydoc rxcpp::sources::from(Coordination cn)
1703 */
1704 template<class T, class Coordination>
from(Coordination cn)1705 static auto from(Coordination cn)
1706 -> typename std::enable_if<is_coordination<Coordination>::value,
1707 decltype( rxs::from<T>(std::move(cn)))>::type {
1708 return rxs::from<T>(std::move(cn));
1709 }
1710 /*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
1711 */
1712 template<class Value0, class... ValueN>
from(Value0 v0,ValueN...vn)1713 static auto from(Value0 v0, ValueN... vn)
1714 -> typename std::enable_if<!is_coordination<Value0>::value,
1715 decltype( rxs::from(v0, vn...))>::type {
1716 return rxs::from(v0, vn...);
1717 }
1718 /*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
1719 */
1720 template<class Coordination, class Value0, class... ValueN>
from(Coordination cn,Value0 v0,ValueN...vn)1721 static auto from(Coordination cn, Value0 v0, ValueN... vn)
1722 -> typename std::enable_if<is_coordination<Coordination>::value,
1723 decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1724 return rxs::from(std::move(cn), v0, vn...);
1725 }
1726
1727 /*! @copydoc rxcpp::sources::just(Value0 v0)
1728 */
1729 template<class T>
just(T v)1730 static auto just(T v)
1731 -> decltype(rxs::just(std::move(v))) {
1732 return rxs::just(std::move(v));
1733 }
1734 /*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
1735 */
1736 template<class T, class Coordination>
just(T v,Coordination cn)1737 static auto just(T v, Coordination cn)
1738 -> decltype(rxs::just(std::move(v), std::move(cn))) {
1739 return rxs::just(std::move(v), std::move(cn));
1740 }
1741
1742 /*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
1743 */
1744 template<class Observable, class Value0, class... ValueN>
start_with(Observable o,Value0 v0,ValueN...vn)1745 static auto start_with(Observable o, Value0 v0, ValueN... vn)
1746 -> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1747 return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1748 }
1749
1750 /*! @copydoc rx-empty.hpp
1751 */
1752 template<class T>
empty()1753 static auto empty()
1754 -> decltype(from<T>()) {
1755 return from<T>();
1756 }
1757 /*! @copydoc rx-empty.hpp
1758 */
1759 template<class T, class Coordination>
empty(Coordination cn)1760 static auto empty(Coordination cn)
1761 -> decltype(from<T>(std::move(cn))) {
1762 return from<T>(std::move(cn));
1763 }
1764
1765 /*! @copydoc rx-error.hpp
1766 */
1767 template<class T, class Exception>
error(Exception && e)1768 static auto error(Exception&& e)
1769 -> decltype(rxs::error<T>(std::forward<Exception>(e))) {
1770 return rxs::error<T>(std::forward<Exception>(e));
1771 }
1772 /*! @copydoc rx-error.hpp
1773 */
1774 template<class T, class Exception, class Coordination>
error(Exception && e,Coordination cn)1775 static auto error(Exception&& e, Coordination cn)
1776 -> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
1777 return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
1778 }
1779
1780 /*! @copydoc rx-scope.hpp
1781 */
1782 template<class ResourceFactory, class ObservableFactory>
scope(ResourceFactory rf,ObservableFactory of)1783 static auto scope(ResourceFactory rf, ObservableFactory of)
1784 -> decltype(rxs::scope(std::move(rf), std::move(of))) {
1785 return rxs::scope(std::move(rf), std::move(of));
1786 }
1787 };
1788
1789 }
1790
1791 //
1792 // support range() >> filter() >> subscribe() syntax
1793 // '>>' is spelled 'stream'
1794 //
1795 template<class T, class SourceOperator, class OperatorFactory>
operator >>(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1796 auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1797 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1798 return source.op(std::forward<OperatorFactory>(of));
1799 }
1800
1801 //
1802 // support range() | filter() | subscribe() syntax
1803 // '|' is spelled 'pipe'
1804 //
1805 template<class T, class SourceOperator, class OperatorFactory>
operator |(const rxcpp::observable<T,SourceOperator> & source,OperatorFactory && of)1806 auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
1807 -> decltype(source.op(std::forward<OperatorFactory>(of))) {
1808 return source.op(std::forward<OperatorFactory>(of));
1809 }
1810
1811 #endif
1812