1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SUBSCRIPTION_HPP)
6 #define RXCPP_RX_SUBSCRIPTION_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace detail {
13
14 template<class F>
15 struct is_unsubscribe_function
16 {
17 struct not_void {};
18 template<class CF>
19 static auto check(int) -> decltype((*(CF*)nullptr)());
20 template<class CF>
21 static not_void check(...);
22
23 static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
24 };
25
26 }
27
28 struct tag_subscription {};
29 struct subscription_base {typedef tag_subscription subscription_tag;};
30 template<class T>
31 class is_subscription
32 {
33 template<class C>
34 static typename C::subscription_tag* check(int);
35 template<class C>
36 static void check(...);
37 public:
38 static const bool value = std::is_convertible<decltype(check<rxu::decay_t<T>>(0)), tag_subscription*>::value;
39 };
40
41 template<class Unsubscribe>
42 class static_subscription
43 {
44 typedef rxu::decay_t<Unsubscribe> unsubscribe_call_type;
45 unsubscribe_call_type unsubscribe_call;
static_subscription()46 static_subscription()
47 {
48 }
49 public:
static_subscription(const static_subscription & o)50 static_subscription(const static_subscription& o)
51 : unsubscribe_call(o.unsubscribe_call)
52 {
53 }
static_subscription(static_subscription && o)54 static_subscription(static_subscription&& o)
55 : unsubscribe_call(std::move(o.unsubscribe_call))
56 {
57 }
static_subscription(unsubscribe_call_type s)58 static_subscription(unsubscribe_call_type s)
59 : unsubscribe_call(std::move(s))
60 {
61 }
unsubscribe() const62 void unsubscribe() const {
63 unsubscribe_call();
64 }
65 };
66
67 class subscription : public subscription_base
68 {
69 class base_subscription_state : public std::enable_shared_from_this<base_subscription_state>
70 {
71 base_subscription_state();
72 public:
73
base_subscription_state(bool initial)74 explicit base_subscription_state(bool initial)
75 : issubscribed(initial)
76 {
77 }
~base_subscription_state()78 virtual ~base_subscription_state() {}
unsubscribe()79 virtual void unsubscribe() {
80 }
81 std::atomic<bool> issubscribed;
82 };
83 public:
84 typedef std::weak_ptr<base_subscription_state> weak_state_type;
85
86 private:
87 template<class I>
88 struct subscription_state : public base_subscription_state
89 {
90 typedef rxu::decay_t<I> inner_t;
subscription_staterxcpp::subscription::subscription_state91 subscription_state(inner_t i)
92 : base_subscription_state(true)
93 , inner(std::move(i))
94 {
95 }
unsubscriberxcpp::subscription::subscription_state96 virtual void unsubscribe() {
97 if (issubscribed.exchange(false)) {
98 trace_activity().unsubscribe_enter(*this);
99 inner.unsubscribe();
100 trace_activity().unsubscribe_return(*this);
101 }
102 }
103 inner_t inner;
104 };
105
106 protected:
107 std::shared_ptr<base_subscription_state> state;
108
109 friend bool operator<(const subscription&, const subscription&);
110 friend bool operator==(const subscription&, const subscription&);
111
112 private:
subscription(weak_state_type w)113 subscription(weak_state_type w)
114 : state(w.lock())
115 {
116 if (!state) {
117 std::terminate();
118 }
119 }
120
subscription(std::shared_ptr<base_subscription_state> s)121 explicit subscription(std::shared_ptr<base_subscription_state> s)
122 : state(std::move(s))
123 {
124 if (!state) {
125 std::terminate();
126 }
127 }
128 public:
129
subscription()130 subscription()
131 : state(std::make_shared<base_subscription_state>(false))
132 {
133 if (!state) {
134 std::terminate();
135 }
136 }
137 template<class U>
subscription(U u,typename std::enable_if<!is_subscription<U>::value,void ** >::type=nullptr)138 explicit subscription(U u, typename std::enable_if<!is_subscription<U>::value, void**>::type = nullptr)
139 : state(std::make_shared<subscription_state<U>>(std::move(u)))
140 {
141 if (!state) {
142 std::terminate();
143 }
144 }
145 template<class U>
subscription(U u,typename std::enable_if<!std::is_same<subscription,U>::value && is_subscription<U>::value,void ** >::type=nullptr)146 explicit subscription(U u, typename std::enable_if<!std::is_same<subscription, U>::value && is_subscription<U>::value, void**>::type = nullptr)
147 // intentionally slice
148 : state(std::move((*static_cast<subscription*>(&u)).state))
149 {
150 if (!state) {
151 std::terminate();
152 }
153 }
subscription(const subscription & o)154 subscription(const subscription& o)
155 : state(o.state)
156 {
157 if (!state) {
158 std::terminate();
159 }
160 }
subscription(subscription && o)161 subscription(subscription&& o)
162 : state(std::move(o.state))
163 {
164 if (!state) {
165 std::terminate();
166 }
167 }
operator =(subscription o)168 subscription& operator=(subscription o) {
169 state = std::move(o.state);
170 return *this;
171 }
is_subscribed() const172 bool is_subscribed() const {
173 if (!state) {
174 std::terminate();
175 }
176 return state->issubscribed;
177 }
unsubscribe() const178 void unsubscribe() const {
179 if (!state) {
180 std::terminate();
181 }
182 auto keepAlive = state;
183 state->unsubscribe();
184 }
185
get_weak()186 weak_state_type get_weak() {
187 return state;
188 }
189
190 // Atomically promote weak subscription to strong.
191 // Calls std::terminate if w has already expired.
lock(weak_state_type w)192 static subscription lock(weak_state_type w) {
193 return subscription(w);
194 }
195
196 // Atomically try to promote weak subscription to strong.
197 // Returns an empty maybe<> if w has already expired.
maybe_lock(weak_state_type w)198 static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
199 auto strong_subscription = w.lock();
200 if (!strong_subscription) {
201 return rxu::detail::maybe<subscription>{};
202 } else {
203 return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
204 }
205 }
206 };
207
operator <(const subscription & lhs,const subscription & rhs)208 inline bool operator<(const subscription& lhs, const subscription& rhs) {
209 return lhs.state < rhs.state;
210 }
operator ==(const subscription & lhs,const subscription & rhs)211 inline bool operator==(const subscription& lhs, const subscription& rhs) {
212 return lhs.state == rhs.state;
213 }
operator !=(const subscription & lhs,const subscription & rhs)214 inline bool operator!=(const subscription& lhs, const subscription& rhs) {
215 return !(lhs == rhs);
216 }
217
218
make_subscription()219 inline auto make_subscription()
220 -> subscription {
221 return subscription();
222 }
223 template<class I>
make_subscription(I && i)224 auto make_subscription(I&& i)
225 -> typename std::enable_if<!is_subscription<I>::value && !detail::is_unsubscribe_function<I>::value,
226 subscription>::type {
227 return subscription(std::forward<I>(i));
228 }
229 template<class Unsubscribe>
make_subscription(Unsubscribe && u)230 auto make_subscription(Unsubscribe&& u)
231 -> typename std::enable_if<detail::is_unsubscribe_function<Unsubscribe>::value,
232 subscription>::type {
233 return subscription(static_subscription<Unsubscribe>(std::forward<Unsubscribe>(u)));
234 }
235
236 class composite_subscription;
237
238 namespace detail {
239
240 struct tag_composite_subscription_empty {};
241
242 class composite_subscription_inner
243 {
244 private:
245 typedef subscription::weak_state_type weak_subscription;
246 struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
247 {
248 // invariant: cannot access this data without the lock held.
249 std::set<subscription> subscriptions;
250 // double checked locking:
251 // issubscribed must be loaded again after each lock acquisition.
252 // invariant:
253 // never call subscription::unsubscribe with lock held.
254 std::mutex lock;
255 // invariant: transitions from 'true' to 'false' exactly once, at any time.
256 std::atomic<bool> issubscribed;
257
~composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state258 ~composite_subscription_state()
259 {
260 std::unique_lock<decltype(lock)> guard(lock);
261 subscriptions.clear();
262 }
263
composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state264 composite_subscription_state()
265 : issubscribed(true)
266 {
267 }
composite_subscription_staterxcpp::detail::composite_subscription_inner::composite_subscription_state268 composite_subscription_state(tag_composite_subscription_empty)
269 : issubscribed(false)
270 {
271 }
272
273 // Atomically add 's' to the set of subscriptions.
274 //
275 // If unsubscribe() has already occurred, this immediately
276 // calls s.unsubscribe().
277 //
278 // cs.unsubscribe() [must] happens-before s.unsubscribe()
279 //
280 // Due to the un-atomic nature of calling 's.unsubscribe()',
281 // it is possible to observe the unintuitive
282 // add(s)=>s.unsubscribe() prior
283 // to any of the unsubscribe()=>sN.unsubscribe().
addrxcpp::detail::composite_subscription_inner::composite_subscription_state284 inline weak_subscription add(subscription s) {
285 if (!issubscribed) { // load.acq [seq_cst]
286 s.unsubscribe();
287 } else if (s.is_subscribed()) {
288 std::unique_lock<decltype(lock)> guard(lock);
289 if (!issubscribed) { // load.acq [seq_cst]
290 // unsubscribe was called concurrently.
291 guard.unlock();
292 // invariant: do not call unsubscribe with lock held.
293 s.unsubscribe();
294 } else {
295 subscriptions.insert(s);
296 }
297 }
298 return s.get_weak();
299 }
300
301 // Atomically remove 'w' from the set of subscriptions.
302 //
303 // This does nothing if 'w' was already previously removed,
304 // or refers to an expired value.
removerxcpp::detail::composite_subscription_inner::composite_subscription_state305 inline void remove(weak_subscription w) {
306 if (issubscribed) { // load.acq [seq_cst]
307 rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);
308
309 if (maybe_subscription.empty()) {
310 // Do nothing if the subscription has already expired.
311 return;
312 }
313
314 std::unique_lock<decltype(lock)> guard(lock);
315 // invariant: subscriptions must be accessed under the lock.
316
317 if (issubscribed) { // load.acq [seq_cst]
318 subscription& s = maybe_subscription.get();
319 subscriptions.erase(std::move(s));
320 } // else unsubscribe() was called concurrently; this becomes a no-op.
321 }
322 }
323
324 // Atomically clear all subscriptions that were observably added
325 // (and not subsequently observably removed).
326 //
327 // Un-atomically call unsubscribe on those subscriptions.
328 //
329 // forall subscriptions in {add(s1),add(s2),...}
330 // - {remove(s3), remove(s4), ...}:
331 // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
332 //
333 // cs.unsubscribe() observed-before cs.clear ==> do nothing.
clearrxcpp::detail::composite_subscription_inner::composite_subscription_state334 inline void clear() {
335 if (issubscribed) { // load.acq [seq_cst]
336 std::unique_lock<decltype(lock)> guard(lock);
337
338 if (!issubscribed) { // load.acq [seq_cst]
339 // unsubscribe was called concurrently.
340 return;
341 }
342
343 std::set<subscription> v(std::move(subscriptions));
344 // invariant: do not call unsubscribe with lock held.
345 guard.unlock();
346 std::for_each(v.begin(), v.end(),
347 [](const subscription& s) {
348 s.unsubscribe(); });
349 }
350 }
351
352 // Atomically clear all subscriptions that were observably added
353 // (and not subsequently observably removed).
354 //
355 // Un-atomically call unsubscribe on those subscriptions.
356 //
357 // Switches to an 'unsubscribed' state, all subsequent
358 // adds are immediately unsubscribed.
359 //
360 // cs.unsubscribe() [must] happens-before
361 // cs.add(s) ==> s.unsubscribe()
362 //
363 // forall subscriptions in {add(s1),add(s2),...}
364 // - {remove(s3), remove(s4), ...}:
365 // cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
unsubscriberxcpp::detail::composite_subscription_inner::composite_subscription_state366 inline void unsubscribe() {
367 if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
368 std::unique_lock<decltype(lock)> guard(lock);
369
370 // is_subscribed can only transition to 'false' once,
371 // does not need an extra atomic access here.
372
373 std::set<subscription> v(std::move(subscriptions));
374 // invariant: do not call unsubscribe with lock held.
375 guard.unlock();
376 std::for_each(v.begin(), v.end(),
377 [](const subscription& s) {
378 s.unsubscribe(); });
379 }
380 }
381 };
382
383 public:
384 typedef std::shared_ptr<composite_subscription_state> shared_state_type;
385
386 protected:
387 mutable shared_state_type state;
388
389 public:
composite_subscription_inner()390 composite_subscription_inner()
391 : state(std::make_shared<composite_subscription_state>())
392 {
393 }
composite_subscription_inner(tag_composite_subscription_empty et)394 composite_subscription_inner(tag_composite_subscription_empty et)
395 : state(std::make_shared<composite_subscription_state>(et))
396 {
397 }
398
composite_subscription_inner(const composite_subscription_inner & o)399 composite_subscription_inner(const composite_subscription_inner& o)
400 : state(o.state)
401 {
402 if (!state) {
403 std::terminate();
404 }
405 }
composite_subscription_inner(composite_subscription_inner && o)406 composite_subscription_inner(composite_subscription_inner&& o)
407 : state(std::move(o.state))
408 {
409 if (!state) {
410 std::terminate();
411 }
412 }
413
operator =(composite_subscription_inner o)414 composite_subscription_inner& operator=(composite_subscription_inner o)
415 {
416 state = std::move(o.state);
417 if (!state) {
418 std::terminate();
419 }
420 return *this;
421 }
422
add(subscription s) const423 inline weak_subscription add(subscription s) const {
424 if (!state) {
425 std::terminate();
426 }
427 return state->add(std::move(s));
428 }
remove(weak_subscription w) const429 inline void remove(weak_subscription w) const {
430 if (!state) {
431 std::terminate();
432 }
433 state->remove(std::move(w));
434 }
clear() const435 inline void clear() const {
436 if (!state) {
437 std::terminate();
438 }
439 state->clear();
440 }
unsubscribe()441 inline void unsubscribe() {
442 if (!state) {
443 std::terminate();
444 }
445 state->unsubscribe();
446 }
447 };
448
449 inline composite_subscription shared_empty();
450
451 }
452
453 /*!
454 \brief controls lifetime for scheduler::schedule and observable<T, SourceOperator>::subscribe.
455
456 \ingroup group-core
457
458 */
459 class composite_subscription
460 : protected detail::composite_subscription_inner
461 , public subscription
462 {
463 typedef detail::composite_subscription_inner inner_type;
464 public:
465 typedef subscription::weak_state_type weak_subscription;
466
composite_subscription(detail::tag_composite_subscription_empty et)467 composite_subscription(detail::tag_composite_subscription_empty et)
468 : inner_type(et)
469 , subscription() // use empty base
470 {
471 }
472
473 public:
474
composite_subscription()475 composite_subscription()
476 : inner_type()
477 , subscription(*static_cast<const inner_type*>(this))
478 {
479 }
480
composite_subscription(const composite_subscription & o)481 composite_subscription(const composite_subscription& o)
482 : inner_type(o)
483 , subscription(static_cast<const subscription&>(o))
484 {
485 }
composite_subscription(composite_subscription && o)486 composite_subscription(composite_subscription&& o)
487 : inner_type(std::move(o))
488 , subscription(std::move(static_cast<subscription&>(o)))
489 {
490 }
491
operator =(composite_subscription o)492 composite_subscription& operator=(composite_subscription o)
493 {
494 inner_type::operator=(std::move(o));
495 subscription::operator=(std::move(*static_cast<subscription*>(&o)));
496 return *this;
497 }
498
empty()499 static inline composite_subscription empty() {
500 return detail::shared_empty();
501 }
502
503 using subscription::is_subscribed;
504 using subscription::unsubscribe;
505
506 using inner_type::clear;
507
add(subscription s) const508 inline weak_subscription add(subscription s) const {
509 if (s == static_cast<const subscription&>(*this)) {
510 // do not nest the same subscription
511 std::terminate();
512 //return s.get_weak();
513 }
514 auto that = this->subscription::state.get();
515 trace_activity().subscription_add_enter(*that, s);
516 auto w = inner_type::add(std::move(s));
517 trace_activity().subscription_add_return(*that);
518 return w;
519 }
520
521 template<class F>
add(F f) const522 auto add(F f) const
523 -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
524 return add(make_subscription(std::move(f)));
525 }
526
remove(weak_subscription w) const527 inline void remove(weak_subscription w) const {
528 auto that = this->subscription::state.get();
529 trace_activity().subscription_remove_enter(*that, w);
530 inner_type::remove(w);
531 trace_activity().subscription_remove_return(*that);
532 }
533 };
534
operator <(const composite_subscription & lhs,const composite_subscription & rhs)535 inline bool operator<(const composite_subscription& lhs, const composite_subscription& rhs) {
536 return static_cast<const subscription&>(lhs) < static_cast<const subscription&>(rhs);
537 }
operator ==(const composite_subscription & lhs,const composite_subscription & rhs)538 inline bool operator==(const composite_subscription& lhs, const composite_subscription& rhs) {
539 return static_cast<const subscription&>(lhs) == static_cast<const subscription&>(rhs);
540 }
operator !=(const composite_subscription & lhs,const composite_subscription & rhs)541 inline bool operator!=(const composite_subscription& lhs, const composite_subscription& rhs) {
542 return !(lhs == rhs);
543 }
544
545 namespace detail {
546
shared_empty()547 inline composite_subscription shared_empty() {
548 static composite_subscription shared_empty = composite_subscription(tag_composite_subscription_empty());
549 return shared_empty;
550 }
551
552 }
553
554 template<class T>
555 class resource : public subscription_base
556 {
557 public:
558 typedef typename composite_subscription::weak_subscription weak_subscription;
559
resource()560 resource()
561 : lifetime(composite_subscription())
562 , value(std::make_shared<rxu::detail::maybe<T>>())
563 {
564 }
565
resource(T t,composite_subscription cs=composite_subscription ())566 explicit resource(T t, composite_subscription cs = composite_subscription())
567 : lifetime(std::move(cs))
568 , value(std::make_shared<rxu::detail::maybe<T>>(rxu::detail::maybe<T>(std::move(t))))
569 {
570 auto localValue = value;
571 lifetime.add(
572 [localValue](){
573 localValue->reset();
574 }
575 );
576 }
577
get()578 T& get() {
579 return value.get()->get();
580 }
get_subscription()581 composite_subscription& get_subscription() {
582 return lifetime;
583 }
584
is_subscribed() const585 bool is_subscribed() const {
586 return lifetime.is_subscribed();
587 }
add(subscription s) const588 weak_subscription add(subscription s) const {
589 return lifetime.add(std::move(s));
590 }
591 template<class F>
add(F f) const592 auto add(F f) const
593 -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
594 return lifetime.add(make_subscription(std::move(f)));
595 }
remove(weak_subscription w) const596 void remove(weak_subscription w) const {
597 return lifetime.remove(std::move(w));
598 }
clear() const599 void clear() const {
600 return lifetime.clear();
601 }
unsubscribe() const602 void unsubscribe() const {
603 return lifetime.unsubscribe();
604 }
605
606 protected:
607 composite_subscription lifetime;
608 std::shared_ptr<rxu::detail::maybe<T>> value;
609 };
610
611 }
612
613 #endif
614