1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_SOURCES_RX_ERROR_HPP)
6 #define RXCPP_SOURCES_RX_ERROR_HPP
7
8 #include "../rx-includes.hpp"
9
10 /*! \file rx-error.hpp
11
12 \brief Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler.
13
14 \tparam T the type of (not) emitted items
15 \tparam Exception the type of the error
16 \tparam Coordination the type of the scheduler (optional)
17
18 \param e the error to be passed to observers
19 \param cn the scheduler to use for scheduling the items (optional)
20
21 \return Observable that sends no items to observer and immediately generates an error.
22
23 \sample
24 \snippet error.cpp error sample
25 \snippet output.txt error sample
26
27 \sample
28 \snippet error.cpp threaded error sample
29 \snippet output.txt threaded error sample
30 */
31
32 namespace rxcpp {
33
34 namespace sources {
35
36 namespace detail {
37
38 template<class T, class Coordination>
39 struct error : public source_base<T>
40 {
41 typedef error<T, Coordination> this_type;
42
43 typedef rxu::decay_t<Coordination> coordination_type;
44
45 typedef typename coordination_type::coordinator_type coordinator_type;
46
47 struct error_initial_type
48 {
error_initial_typerxcpp::sources::detail::error::error_initial_type49 error_initial_type(rxu::error_ptr e, coordination_type cn)
50 : exception(e)
51 , coordination(std::move(cn))
52 {
53 }
54 rxu::error_ptr exception;
55 coordination_type coordination;
56 };
57 error_initial_type initial;
58
errorrxcpp::sources::detail::error59 error(rxu::error_ptr e, coordination_type cn)
60 : initial(e, std::move(cn))
61 {
62 }
63
64 template<class Subscriber>
on_subscriberxcpp::sources::detail::error65 void on_subscribe(Subscriber o) const {
66
67 // creates a worker whose lifetime is the same as this subscription
68 auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
69 auto controller = coordinator.get_worker();
70 auto exception = initial.exception;
71
72 auto producer = [=](const rxsc::schedulable&){
73 auto& dest = o;
74 if (!dest.is_subscribed()) {
75 // terminate loop
76 return;
77 }
78
79 dest.on_error(exception);
80 // o is unsubscribed
81 };
82 auto selectedProducer = on_exception(
83 [&](){return coordinator.act(producer);},
84 o);
85 if (selectedProducer.empty()) {
86 return;
87 }
88 controller.schedule(selectedProducer.get());
89 }
90 };
91
92 struct throw_ptr_tag{};
93 struct throw_instance_tag{};
94
95 template <class T, class Coordination>
make_error(throw_ptr_tag &&,rxu::error_ptr exception,Coordination cn)96 auto make_error(throw_ptr_tag&&, rxu::error_ptr exception, Coordination cn)
97 -> observable<T, error<T, Coordination>> {
98 return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
99 }
100
101 template <class T, class E, class Coordination>
make_error(throw_instance_tag &&,E e,Coordination cn)102 auto make_error(throw_instance_tag&&, E e, Coordination cn)
103 -> observable<T, error<T, Coordination>> {
104 rxu::error_ptr ep = rxu::make_error_ptr(e);
105 return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(ep), std::move(cn)));
106 }
107
108 }
109
110 }
111
112 namespace sources {
113
114 /*! @copydoc rx-error.hpp
115 */
116 template<class T, class E>
error(E e)117 auto error(E e)
118 -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
119 return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
120 }
121 /*! @copydoc rx-error.hpp
122 */
123 template<class T, class E, class Coordination>
error(E e,Coordination cn)124 auto error(E e, Coordination cn)
125 -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
126 return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
127 }
128
129 }
130
131 }
132
133 #endif
134