1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_OBSERVER_HPP)
6 #define RXCPP_RX_OBSERVER_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12
13 template<class T>
14 struct observer_base
15 {
16 typedef T value_type;
17 typedef tag_observer observer_tag;
18 };
19
20 namespace detail {
21 template<class T>
22 struct OnNextEmpty
23 {
operator ()rxcpp::detail::OnNextEmpty24 void operator()(const T&) const {}
25 };
26 struct OnErrorEmpty
27 {
operator ()rxcpp::detail::OnErrorEmpty28 void operator()(rxu::error_ptr) const {
29 // error implicitly ignored, abort
30 std::terminate();
31 }
32 };
33 struct OnErrorIgnore
34 {
operator ()rxcpp::detail::OnErrorIgnore35 void operator()(rxu::error_ptr) const {
36 }
37 };
38 struct OnCompletedEmpty
39 {
operator ()rxcpp::detail::OnCompletedEmpty40 void operator()() const {}
41 };
42
43 template<class T, class State, class OnNext>
44 struct OnNextForward
45 {
46 using state_t = rxu::decay_t<State>;
47 using onnext_t = rxu::decay_t<OnNext>;
OnNextForwardrxcpp::detail::OnNextForward48 OnNextForward() : onnext() {}
OnNextForwardrxcpp::detail::OnNextForward49 explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
50 onnext_t onnext;
operator ()rxcpp::detail::OnNextForward51 void operator()(state_t& s, T& t) const {
52 onnext(s, t);
53 }
operator ()rxcpp::detail::OnNextForward54 void operator()(state_t& s, T&& t) const {
55 onnext(s, t);
56 }
57 };
58 template<class T, class State>
59 struct OnNextForward<T, State, void>
60 {
61 using state_t = rxu::decay_t<State>;
OnNextForwardrxcpp::detail::OnNextForward62 OnNextForward() {}
operator ()rxcpp::detail::OnNextForward63 void operator()(state_t& s, T& t) const {
64 s.on_next(t);
65 }
operator ()rxcpp::detail::OnNextForward66 void operator()(state_t& s, T&& t) const {
67 s.on_next(t);
68 }
69 };
70
71 template<class State, class OnError>
72 struct OnErrorForward
73 {
74 using state_t = rxu::decay_t<State>;
75 using onerror_t = rxu::decay_t<OnError>;
OnErrorForwardrxcpp::detail::OnErrorForward76 OnErrorForward() : onerror() {}
OnErrorForwardrxcpp::detail::OnErrorForward77 explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
78 onerror_t onerror;
operator ()rxcpp::detail::OnErrorForward79 void operator()(state_t& s, rxu::error_ptr ep) const {
80 onerror(s, ep);
81 }
82 };
83 template<class State>
84 struct OnErrorForward<State, void>
85 {
86 using state_t = rxu::decay_t<State>;
OnErrorForwardrxcpp::detail::OnErrorForward87 OnErrorForward() {}
operator ()rxcpp::detail::OnErrorForward88 void operator()(state_t& s, rxu::error_ptr ep) const {
89 s.on_error(ep);
90 }
91 };
92
93 template<class State, class OnCompleted>
94 struct OnCompletedForward
95 {
96 using state_t = rxu::decay_t<State>;
97 using oncompleted_t = rxu::decay_t<OnCompleted>;
OnCompletedForwardrxcpp::detail::OnCompletedForward98 OnCompletedForward() : oncompleted() {}
OnCompletedForwardrxcpp::detail::OnCompletedForward99 explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
100 oncompleted_t oncompleted;
operator ()rxcpp::detail::OnCompletedForward101 void operator()(state_t& s) const {
102 oncompleted(s);
103 }
104 };
105 template<class State>
106 struct OnCompletedForward<State, void>
107 {
OnCompletedForwardrxcpp::detail::OnCompletedForward108 OnCompletedForward() {}
operator ()rxcpp::detail::OnCompletedForward109 void operator()(State& s) const {
110 s.on_completed();
111 }
112 };
113
114 template<class T, class F>
115 struct is_on_next_of
116 {
117 struct not_void {};
118 template<class CT, class CF>
119 static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
120 template<class CT, class CF>
121 static not_void check(...);
122
123 typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
124 static const bool value = std::is_same<detail_result, void>::value;
125 };
126
127 template<class F>
128 struct is_on_error
129 {
130 struct not_void {};
131 template<class CF>
132 static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
133 template<class CF>
134 static not_void check(...);
135
136 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
137 };
138
139 template<class State, class F>
140 struct is_on_error_for
141 {
142 struct not_void {};
143 template<class CF>
144 static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
145 template<class CF>
146 static not_void check(...);
147
148 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
149 };
150
151 template<class F>
152 struct is_on_completed
153 {
154 struct not_void {};
155 template<class CF>
156 static auto check(int) -> decltype((*(CF*)nullptr)());
157 template<class CF>
158 static not_void check(...);
159
160 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
161 };
162
163 }
164
165
166 /*!
167 \brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function.
168
169 \tparam T - the type of value in the stream
170 \tparam State - the type of the stored state
171 \tparam OnNext - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called.
172 \tparam OnError - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
173 \tparam OnCompleted - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called.
174
175 \ingroup group-core
176
177 */
178 template<class T, class State, class OnNext, class OnError, class OnCompleted>
179 class observer : public observer_base<T>
180 {
181 public:
182 using this_type = observer<T, State, OnNext, OnError, OnCompleted>;
183 using state_t = rxu::decay_t<State>;
184 using on_next_t = typename std::conditional<
185 !std::is_same<void, OnNext>::value,
186 rxu::decay_t<OnNext>,
187 detail::OnNextForward<T, State, OnNext>>::type;
188 using on_error_t = typename std::conditional<
189 !std::is_same<void, OnError>::value,
190 rxu::decay_t<OnError>,
191 detail::OnErrorForward<State, OnError>>::type;
192 using on_completed_t = typename std::conditional<
193 !std::is_same<void, OnCompleted>::value,
194 rxu::decay_t<OnCompleted>,
195 detail::OnCompletedForward<State, OnCompleted>>::type;
196
197 private:
198 mutable state_t state;
199 on_next_t onnext;
200 on_error_t onerror;
201 on_completed_t oncompleted;
202
203 public:
204
observer(state_t s,on_next_t n=on_next_t (),on_error_t e=on_error_t (),on_completed_t c=on_completed_t ())205 explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
206 : state(std::move(s))
207 , onnext(std::move(n))
208 , onerror(std::move(e))
209 , oncompleted(std::move(c))
210 {
211 }
observer(state_t s,on_next_t n,on_completed_t c)212 explicit observer(state_t s, on_next_t n, on_completed_t c)
213 : state(std::move(s))
214 , onnext(std::move(n))
215 , onerror(on_error_t())
216 , oncompleted(std::move(c))
217 {
218 }
observer(const this_type & o)219 observer(const this_type& o)
220 : state(o.state)
221 , onnext(o.onnext)
222 , onerror(o.onerror)
223 , oncompleted(o.oncompleted)
224 {
225 }
observer(this_type && o)226 observer(this_type&& o)
227 : state(std::move(o.state))
228 , onnext(std::move(o.onnext))
229 , onerror(std::move(o.onerror))
230 , oncompleted(std::move(o.oncompleted))
231 {
232 }
operator =(this_type o)233 this_type& operator=(this_type o) {
234 state = std::move(o.state);
235 onnext = std::move(o.onnext);
236 onerror = std::move(o.onerror);
237 oncompleted = std::move(o.oncompleted);
238 return *this;
239 }
240
on_next(T & t) const241 void on_next(T& t) const {
242 onnext(state, t);
243 }
on_next(T && t) const244 void on_next(T&& t) const {
245 onnext(state, std::move(t));
246 }
on_error(rxu::error_ptr e) const247 void on_error(rxu::error_ptr e) const {
248 onerror(state, e);
249 }
on_completed() const250 void on_completed() const {
251 oncompleted(state);
252 }
as_dynamic() const253 observer<T> as_dynamic() const {
254 return observer<T>(*this);
255 }
256 };
257
258 /*!
259 \brief consumes values from an observable using default empty method implementations with optional overrides of each function.
260
261 \tparam T - the type of value in the stream
262 \tparam OnNext - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used.
263 \tparam OnError - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
264 \tparam OnCompleted - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used.
265
266 \ingroup group-core
267
268 */
269 template<class T, class OnNext, class OnError, class OnCompleted>
270 class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
271 {
272 public:
273 using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>;
274 using on_next_t = typename std::conditional<
275 !std::is_same<void, OnNext>::value,
276 rxu::decay_t<OnNext>,
277 detail::OnNextEmpty<T>>::type;
278 using on_error_t = typename std::conditional<
279 !std::is_same<void, OnError>::value,
280 rxu::decay_t<OnError>,
281 detail::OnErrorEmpty>::type;
282 using on_completed_t = typename std::conditional<
283 !std::is_same<void, OnCompleted>::value,
284 rxu::decay_t<OnCompleted>,
285 detail::OnCompletedEmpty>::type;
286
287 private:
288 on_next_t onnext;
289 on_error_t onerror;
290 on_completed_t oncompleted;
291
292 public:
293 static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
294 static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
295 static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
296
observer()297 observer()
298 : onnext(on_next_t())
299 , onerror(on_error_t())
300 , oncompleted(on_completed_t())
301 {
302 }
303
observer(on_next_t n,on_error_t e=on_error_t (),on_completed_t c=on_completed_t ())304 explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
305 : onnext(std::move(n))
306 , onerror(std::move(e))
307 , oncompleted(std::move(c))
308 {
309 }
observer(const this_type & o)310 observer(const this_type& o)
311 : onnext(o.onnext)
312 , onerror(o.onerror)
313 , oncompleted(o.oncompleted)
314 {
315 }
observer(this_type && o)316 observer(this_type&& o)
317 : onnext(std::move(o.onnext))
318 , onerror(std::move(o.onerror))
319 , oncompleted(std::move(o.oncompleted))
320 {
321 }
operator =(this_type o)322 this_type& operator=(this_type o) {
323 onnext = std::move(o.onnext);
324 onerror = std::move(o.onerror);
325 oncompleted = std::move(o.oncompleted);
326 return *this;
327 }
328
on_next(T & t) const329 void on_next(T& t) const {
330 onnext(t);
331 }
on_next(T && t) const332 void on_next(T&& t) const {
333 onnext(std::move(t));
334 }
on_error(rxu::error_ptr e) const335 void on_error(rxu::error_ptr e) const {
336 onerror(e);
337 }
on_completed() const338 void on_completed() const {
339 oncompleted();
340 }
as_dynamic() const341 observer<T> as_dynamic() const {
342 return observer<T>(*this);
343 }
344 };
345
346 namespace detail
347 {
348
349 template<class T>
350 struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
351 {
~virtual_observerrxcpp::detail::virtual_observer352 virtual ~virtual_observer() {}
on_nextrxcpp::detail::virtual_observer353 virtual void on_next(T&) const {};
on_nextrxcpp::detail::virtual_observer354 virtual void on_next(T&&) const {};
on_errorrxcpp::detail::virtual_observer355 virtual void on_error(rxu::error_ptr) const {};
on_completedrxcpp::detail::virtual_observer356 virtual void on_completed() const {};
357 };
358
359 template<class T, class Observer>
360 struct specific_observer : public virtual_observer<T>
361 {
specific_observerrxcpp::detail::specific_observer362 explicit specific_observer(Observer o)
363 : destination(std::move(o))
364 {
365 }
366
367 Observer destination;
on_nextrxcpp::detail::specific_observer368 virtual void on_next(T& t) const {
369 destination.on_next(t);
370 }
on_nextrxcpp::detail::specific_observer371 virtual void on_next(T&& t) const {
372 destination.on_next(std::move(t));
373 }
on_errorrxcpp::detail::specific_observer374 virtual void on_error(rxu::error_ptr e) const {
375 destination.on_error(e);
376 }
on_completedrxcpp::detail::specific_observer377 virtual void on_completed() const {
378 destination.on_completed();
379 }
380 };
381
382 }
383
384 /*!
385 \brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods)
386
387 \tparam T - the type of value in the stream
388
389 \ingroup group-core
390
391 */
392 template<class T>
393 class observer<T, void, void, void, void> : public observer_base<T>
394 {
395 public:
396 typedef tag_dynamic_observer dynamic_observer_tag;
397
398 private:
399 using this_type = observer<T, void, void, void, void>;
400 using base_type = observer_base<T>;
401 using virtual_observer = detail::virtual_observer<T>;
402
403 std::shared_ptr<virtual_observer> destination;
404
405 template<class Observer>
make_destination(Observer o)406 static auto make_destination(Observer o)
407 -> std::shared_ptr<virtual_observer> {
408 return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
409 }
410
411 public:
observer()412 observer()
413 {
414 }
observer(const this_type & o)415 observer(const this_type& o)
416 : destination(o.destination)
417 {
418 }
observer(this_type && o)419 observer(this_type&& o)
420 : destination(std::move(o.destination))
421 {
422 }
423
424 template<class Observer>
observer(Observer o)425 explicit observer(Observer o)
426 : destination(make_destination(std::move(o)))
427 {
428 }
429
operator =(this_type o)430 this_type& operator=(this_type o) {
431 destination = std::move(o.destination);
432 return *this;
433 }
434
435 // perfect forwarding delays the copy of the value.
436 template<class V>
on_next(V && v) const437 void on_next(V&& v) const {
438 if (destination) {
439 destination->on_next(std::forward<V>(v));
440 }
441 }
on_error(rxu::error_ptr e) const442 void on_error(rxu::error_ptr e) const {
443 if (destination) {
444 destination->on_error(e);
445 }
446 }
on_completed() const447 void on_completed() const {
448 if (destination) {
449 destination->on_completed();
450 }
451 }
452
as_dynamic() const453 observer<T> as_dynamic() const {
454 return *this;
455 }
456 };
457
458 template<class T, class DefaultOnError = detail::OnErrorEmpty>
make_observer()459 auto make_observer()
460 -> observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError> {
461 return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError>();
462 }
463
464 template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
make_observer(observer<U,State,OnNext,OnError,OnCompleted> o)465 auto make_observer(observer<U, State, OnNext, OnError, OnCompleted> o)
466 -> observer<T, State, OnNext, OnError, OnCompleted> {
467 return observer<T, State, OnNext, OnError, OnCompleted>(std::move(o));
468 }
469 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
make_observer(Observer ob)470 auto make_observer(Observer ob)
471 -> typename std::enable_if<
472 !detail::is_on_next_of<T, Observer>::value &&
473 !detail::is_on_error<Observer>::value &&
474 is_observer<Observer>::value,
475 Observer>::type {
476 return std::move(ob);
477 }
478 template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
make_observer(Observer ob)479 auto make_observer(Observer ob)
480 -> typename std::enable_if<
481 !detail::is_on_next_of<T, Observer>::value &&
482 !detail::is_on_error<Observer>::value &&
483 !is_observer<Observer>::value,
484 observer<T, Observer>>::type {
485 return observer<T, Observer>(std::move(ob));
486 }
487 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
make_observer(OnNext on)488 auto make_observer(OnNext on)
489 -> typename std::enable_if<
490 detail::is_on_next_of<T, OnNext>::value,
491 observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>>::type {
492 return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>(
493 std::move(on));
494 }
495 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
make_observer(OnError oe)496 auto make_observer(OnError oe)
497 -> typename std::enable_if<
498 !detail::is_on_next_of<T, OnError>::value &&
499 detail::is_on_error<OnError>::value,
500 observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>>::type {
501 return observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>(
502 detail::OnNextEmpty<T>(), std::move(oe));
503 }
504 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
make_observer(OnNext on,OnError oe)505 auto make_observer(OnNext on, OnError oe)
506 -> typename std::enable_if<
507 detail::is_on_next_of<T, OnNext>::value &&
508 detail::is_on_error<OnError>::value,
509 observer<T, detail::stateless_observer_tag, OnNext, OnError>>::type {
510 return observer<T, detail::stateless_observer_tag, OnNext, OnError>(
511 std::move(on), std::move(oe));
512 }
513 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
make_observer(OnNext on,OnCompleted oc)514 auto make_observer(OnNext on, OnCompleted oc)
515 -> typename std::enable_if<
516 detail::is_on_next_of<T, OnNext>::value &&
517 detail::is_on_completed<OnCompleted>::value,
518 observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>>::type {
519 return observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>(
520 std::move(on), DefaultOnError(), std::move(oc));
521 }
522 template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
make_observer(OnNext on,OnError oe,OnCompleted oc)523 auto make_observer(OnNext on, OnError oe, OnCompleted oc)
524 -> typename std::enable_if<
525 detail::is_on_next_of<T, OnNext>::value &&
526 detail::is_on_error<OnError>::value &&
527 detail::is_on_completed<OnCompleted>::value,
528 observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>::type {
529 return observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(
530 std::move(on), std::move(oe), std::move(oc));
531 }
532
533
534 template<class T, class State, class OnNext>
make_observer(State os,OnNext on)535 auto make_observer(State os, OnNext on)
536 -> typename std::enable_if<
537 !detail::is_on_next_of<T, State>::value &&
538 !detail::is_on_error<State>::value,
539 observer<T, State, OnNext>>::type {
540 return observer<T, State, OnNext>(
541 std::move(os), std::move(on));
542 }
543 template<class T, class State, class OnError>
make_observer(State os,OnError oe)544 auto make_observer(State os, OnError oe)
545 -> typename std::enable_if<
546 !detail::is_on_next_of<T, State>::value &&
547 !detail::is_on_error<State>::value &&
548 detail::is_on_error_for<State, OnError>::value,
549 observer<T, State, detail::OnNextEmpty<T>, OnError>>::type {
550 return observer<T, State, detail::OnNextEmpty<T>, OnError>(
551 std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
552 }
553 template<class T, class State, class OnNext, class OnError>
make_observer(State os,OnNext on,OnError oe)554 auto make_observer(State os, OnNext on, OnError oe)
555 -> typename std::enable_if<
556 !detail::is_on_next_of<T, State>::value &&
557 !detail::is_on_error<State>::value &&
558 detail::is_on_error_for<State, OnError>::value,
559 observer<T, State, OnNext, OnError>>::type {
560 return observer<T, State, OnNext, OnError>(
561 std::move(os), std::move(on), std::move(oe));
562 }
563 template<class T, class State, class OnNext, class OnCompleted>
make_observer(State os,OnNext on,OnCompleted oc)564 auto make_observer(State os, OnNext on, OnCompleted oc)
565 -> typename std::enable_if<
566 !detail::is_on_next_of<T, State>::value &&
567 !detail::is_on_error<State>::value,
568 observer<T, State, OnNext, void, OnCompleted>>::type {
569 return observer<T, State, OnNext, void, OnCompleted>(
570 std::move(os), std::move(on), std::move(oc));
571 }
572 template<class T, class State, class OnNext, class OnError, class OnCompleted>
make_observer(State os,OnNext on,OnError oe,OnCompleted oc)573 auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
574 -> typename std::enable_if<
575 !detail::is_on_next_of<T, State>::value &&
576 !detail::is_on_error<State>::value &&
577 detail::is_on_error_for<State, OnError>::value,
578 observer<T, State, OnNext, OnError, OnCompleted>>::type {
579 return observer<T, State, OnNext, OnError, OnCompleted>(
580 std::move(os), std::move(on), std::move(oe), std::move(oc));
581 }
582
583 template<class T, class Observer>
make_observer_dynamic(Observer o)584 auto make_observer_dynamic(Observer o)
585 -> typename std::enable_if<
586 !detail::is_on_next_of<T, Observer>::value,
587 observer<T>>::type {
588 return observer<T>(std::move(o));
589 }
590 template<class T, class OnNext>
make_observer_dynamic(OnNext && on)591 auto make_observer_dynamic(OnNext&& on)
592 -> typename std::enable_if<
593 detail::is_on_next_of<T, OnNext>::value,
594 observer<T>>::type {
595 return observer<T>(
596 make_observer<T>(std::forward<OnNext>(on)));
597 }
598 template<class T, class OnNext, class OnError>
make_observer_dynamic(OnNext && on,OnError && oe)599 auto make_observer_dynamic(OnNext&& on, OnError&& oe)
600 -> typename std::enable_if<
601 detail::is_on_next_of<T, OnNext>::value &&
602 detail::is_on_error<OnError>::value,
603 observer<T>>::type {
604 return observer<T>(
605 make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
606 }
607 template<class T, class OnNext, class OnCompleted>
make_observer_dynamic(OnNext && on,OnCompleted && oc)608 auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
609 -> typename std::enable_if<
610 detail::is_on_next_of<T, OnNext>::value &&
611 detail::is_on_completed<OnCompleted>::value,
612 observer<T>>::type {
613 return observer<T>(
614 make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
615 }
616 template<class T, class OnNext, class OnError, class OnCompleted>
make_observer_dynamic(OnNext && on,OnError && oe,OnCompleted && oc)617 auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
618 -> typename std::enable_if<
619 detail::is_on_next_of<T, OnNext>::value &&
620 detail::is_on_error<OnError>::value &&
621 detail::is_on_completed<OnCompleted>::value,
622 observer<T>>::type {
623 return observer<T>(
624 make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
625 }
626
627 namespace detail {
628
629 template<class F>
630 struct maybe_from_result
631 {
632 typedef decltype((*(F*)nullptr)()) decl_result_type;
633 typedef rxu::decay_t<decl_result_type> result_type;
634 typedef rxu::maybe<result_type> type;
635 };
636
637 }
638
639 template<class F, class OnError>
on_exception(const F & f,const OnError & c)640 auto on_exception(const F& f, const OnError& c)
641 -> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
642 typename detail::maybe_from_result<F>::type r;
643 RXCPP_TRY {
644 r.reset(f());
645 } RXCPP_CATCH(...) {
646 c(rxu::current_exception());
647 }
648 return r;
649 }
650
651 template<class F, class Subscriber>
on_exception(const F & f,const Subscriber & s)652 auto on_exception(const F& f, const Subscriber& s)
653 -> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
654 typename detail::maybe_from_result<F>::type r;
655 RXCPP_TRY {
656 r.reset(f());
657 } RXCPP_CATCH(...) {
658 s.on_error(rxu::current_exception());
659 }
660 return r;
661 }
662
663 }
664
665 #endif
666