1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
6 #define RXCPP_RX_SYNCHRONIZE_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace subjects {
13
14 namespace detail {
15
16 template<class T, class Coordination>
17 class synchronize_observer : public detail::multicast_observer<T>
18 {
19 typedef synchronize_observer<T, Coordination> this_type;
20 typedef detail::multicast_observer<T> base_type;
21
22 typedef rxu::decay_t<Coordination> coordination_type;
23 typedef typename coordination_type::coordinator_type coordinator_type;
24 typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
25
26 struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state>
27 {
28 typedef rxn::notification<T> notification_type;
29 typedef typename notification_type::type base_notification_type;
30 typedef std::deque<base_notification_type> queue_type;
31
32 struct mode
33 {
34 enum type {
35 Invalid = 0,
36 Processing,
37 Empty,
38 Disposed
39 };
40 };
41
42 mutable std::mutex lock;
43 mutable std::condition_variable wake;
44 mutable queue_type fill_queue;
45 composite_subscription lifetime;
46 mutable typename mode::type current;
47 coordinator_type coordinator;
48 output_type destination;
49
ensure_processingrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state50 void ensure_processing(std::unique_lock<std::mutex>& guard) const {
51 if (!guard.owns_lock()) {
52 std::terminate();
53 }
54 if (current == mode::Empty) {
55 current = mode::Processing;
56 auto keepAlive = this->shared_from_this();
57
58 auto drain_queue = [keepAlive, this](const rxsc::schedulable& self){
59 RXCPP_TRY {
60 std::unique_lock<std::mutex> guard(lock);
61 if (!destination.is_subscribed()) {
62 current = mode::Disposed;
63 fill_queue.clear();
64 guard.unlock();
65 lifetime.unsubscribe();
66 return;
67 }
68 if (fill_queue.empty()) {
69 current = mode::Empty;
70 return;
71 }
72 auto notification = std::move(fill_queue.front());
73 fill_queue.pop_front();
74 guard.unlock();
75 notification->accept(destination);
76 self();
77 } RXCPP_CATCH(...) {
78 destination.on_error(rxu::current_exception());
79 std::unique_lock<std::mutex> guard(lock);
80 current = mode::Empty;
81 }
82 };
83
84 auto selectedDrain = on_exception(
85 [&](){return coordinator.act(drain_queue);},
86 destination);
87 if (selectedDrain.empty()) {
88 return;
89 }
90
91 auto processor = coordinator.get_worker();
92 processor.schedule(lifetime, selectedDrain.get());
93 }
94 }
95
synchronize_observer_staterxcpp::subjects::detail::synchronize_observer::synchronize_observer_state96 synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
97 : lifetime(std::move(cs))
98 , current(mode::Empty)
99 , coordinator(std::move(coor))
100 , destination(std::move(scbr))
101 {
102 }
103
104 template<class V>
on_nextrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state105 void on_next(V v) const {
106 if (lifetime.is_subscribed()) {
107 std::unique_lock<std::mutex> guard(lock);
108 fill_queue.push_back(notification_type::on_next(std::move(v)));
109 ensure_processing(guard);
110 }
111 wake.notify_one();
112 }
on_errorrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state113 void on_error(rxu::error_ptr e) const {
114 if (lifetime.is_subscribed()) {
115 std::unique_lock<std::mutex> guard(lock);
116 fill_queue.push_back(notification_type::on_error(e));
117 ensure_processing(guard);
118 }
119 wake.notify_one();
120 }
on_completedrxcpp::subjects::detail::synchronize_observer::synchronize_observer_state121 void on_completed() const {
122 if (lifetime.is_subscribed()) {
123 std::unique_lock<std::mutex> guard(lock);
124 fill_queue.push_back(notification_type::on_completed());
125 ensure_processing(guard);
126 }
127 wake.notify_one();
128 }
129 };
130
131 std::shared_ptr<synchronize_observer_state> state;
132
133 public:
synchronize_observer(coordination_type cn,composite_subscription dl,composite_subscription il)134 synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
135 : base_type(dl)
136 {
137 auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *static_cast<base_type*>(this) ));
138
139 // creates a worker whose lifetime is the same as the destination subscription
140 auto coordinator = cn.create_coordinator(dl);
141
142 state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
143 }
144
get_subscriber() const145 subscriber<T> get_subscriber() const {
146 return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
147 }
148
149 template<class V>
on_next(V v) const150 void on_next(V v) const {
151 state->on_next(std::move(v));
152 }
on_error(rxu::error_ptr e) const153 void on_error(rxu::error_ptr e) const {
154 state->on_error(e);
155 }
on_completed() const156 void on_completed() const {
157 state->on_completed();
158 }
159 };
160
161 }
162
163 template<class T, class Coordination>
164 class synchronize
165 {
166 detail::synchronize_observer<T, Coordination> s;
167
168 public:
synchronize(Coordination cn,composite_subscription cs=composite_subscription ())169 explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription())
170 : s(std::move(cn), std::move(cs), composite_subscription())
171 {
172 }
173
has_observers() const174 bool has_observers() const {
175 return s.has_observers();
176 }
177
get_subscriber() const178 subscriber<T> get_subscriber() const {
179 return s.get_subscriber();
180 }
181
get_observable() const182 observable<T> get_observable() const {
183 auto keepAlive = s;
184 return make_observable_dynamic<T>([=](subscriber<T> o){
185 keepAlive.add(keepAlive.get_subscriber(), std::move(o));
186 });
187 }
188 };
189
190 }
191
192 class synchronize_in_one_worker : public coordination_base
193 {
194 rxsc::scheduler factory;
195
196 class input_type
197 {
198 rxsc::worker controller;
199 rxsc::scheduler factory;
200 identity_one_worker coordination;
201 public:
input_type(rxsc::worker w)202 explicit input_type(rxsc::worker w)
203 : controller(w)
204 , factory(rxsc::make_same_worker(w))
205 , coordination(factory)
206 {
207 }
get_worker() const208 inline rxsc::worker get_worker() const {
209 return controller;
210 }
get_scheduler() const211 inline rxsc::scheduler get_scheduler() const {
212 return factory;
213 }
now() const214 inline rxsc::scheduler::clock_type::time_point now() const {
215 return factory.now();
216 }
217 template<class Observable>
in(Observable o) const218 auto in(Observable o) const
219 -> decltype(o.publish_synchronized(coordination).ref_count()) {
220 return o.publish_synchronized(coordination).ref_count();
221 }
222 template<class Subscriber>
out(Subscriber s) const223 auto out(Subscriber s) const
224 -> Subscriber {
225 return s;
226 }
227 template<class F>
act(F f) const228 auto act(F f) const
229 -> F {
230 return f;
231 }
232 };
233
234 public:
235
synchronize_in_one_worker(rxsc::scheduler sc)236 explicit synchronize_in_one_worker(rxsc::scheduler sc) : factory(sc) {}
237
238 typedef coordinator<input_type> coordinator_type;
239
now() const240 inline rxsc::scheduler::clock_type::time_point now() const {
241 return factory.now();
242 }
243
create_coordinator(composite_subscription cs=composite_subscription ()) const244 inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
245 auto w = factory.create_worker(std::move(cs));
246 return coordinator_type(input_type(std::move(w)));
247 }
248 };
249
synchronize_event_loop()250 inline synchronize_in_one_worker synchronize_event_loop() {
251 static synchronize_in_one_worker r(rxsc::make_event_loop());
252 return r;
253 }
254
synchronize_new_thread()255 inline synchronize_in_one_worker synchronize_new_thread() {
256 static synchronize_in_one_worker r(rxsc::make_new_thread());
257 return r;
258 }
259
260 }
261
262 #endif
263