1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 #pragma once 4 5 #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP) 6 #define RXCPP_RX_REPLAYSUBJECT_HPP 7 8 #include "../rx-includes.hpp" 9 10 namespace rxcpp { 11 12 namespace subjects { 13 14 namespace detail { 15 16 template<class Coordination> 17 struct replay_traits 18 { 19 typedef rxu::maybe<std::size_t> count_type; 20 typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type; 21 typedef rxsc::scheduler::clock_type::time_point time_point_type; 22 typedef rxu::decay_t<Coordination> coordination_type; 23 typedef typename coordination_type::coordinator_type coordinator_type; 24 }; 25 26 template<class T, class Coordination> 27 class replay_observer : public detail::multicast_observer<T> 28 { 29 typedef replay_observer<T, Coordination> this_type; 30 typedef detail::multicast_observer<T> base_type; 31 32 typedef replay_traits<Coordination> traits; 33 typedef typename traits::count_type count_type; 34 typedef typename traits::period_type period_type; 35 typedef typename traits::time_point_type time_point_type; 36 typedef typename traits::coordination_type coordination_type; 37 typedef typename traits::coordinator_type coordinator_type; 38 39 class replay_observer_state : public std::enable_shared_from_this<replay_observer_state> 40 { 41 mutable std::mutex lock; 42 mutable std::list<T> values; 43 mutable std::list<time_point_type> time_points; 44 mutable count_type count; 45 mutable period_type period; 46 mutable composite_subscription replayLifetime; 47 public: 48 mutable coordination_type coordination; 49 mutable coordinator_type coordinator; 50 51 private: remove_oldest() const52 void remove_oldest() const { 53 values.pop_front(); 54 if (!period.empty()) { 55 time_points.pop_front(); 56 } 57 } 58 59 public: ~replay_observer_state()60 ~replay_observer_state(){ 61 replayLifetime.unsubscribe(); 62 } replay_observer_state(count_type _count,period_type _period,coordination_type _coordination,coordinator_type _coordinator,composite_subscription _replayLifetime)63 explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime) 64 : count(_count) 65 , period(_period) 66 , replayLifetime(_replayLifetime) 67 , coordination(std::move(_coordination)) 68 , coordinator(std::move(_coordinator)) 69 { 70 } 71 add(T v) const72 void add(T v) const { 73 std::unique_lock<std::mutex> guard(lock); 74 75 if (!count.empty()) { 76 if (values.size() == count.get()) 77 remove_oldest(); 78 } 79 80 if (!period.empty()) { 81 auto now = coordination.now(); 82 while (!time_points.empty() && (now - time_points.front() > period.get())) 83 remove_oldest(); 84 time_points.push_back(now); 85 } 86 87 values.push_back(std::move(v)); 88 } get() const89 std::list<T> get() const { 90 std::unique_lock<std::mutex> guard(lock); 91 return values; 92 } 93 }; 94 95 std::shared_ptr<replay_observer_state> state; 96 97 public: replay_observer(count_type count,period_type period,coordination_type coordination,composite_subscription replayLifetime,composite_subscription subscriberLifetime)98 replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime) 99 : base_type(subscriberLifetime) 100 { 101 replayLifetime.add(subscriberLifetime); 102 auto coordinator = coordination.create_coordinator(replayLifetime); 103 state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime)); 104 } 105 get_subscriber() const106 subscriber<T> get_subscriber() const { 107 return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic(); 108 } 109 get_values() const110 std::list<T> get_values() const { 111 return state->get(); 112 } 113 get_coordinator() const114 coordinator_type& get_coordinator() const { 115 return state->coordinator; 116 } 117 118 template<class V> on_next(V v) const119 void on_next(V v) const { 120 state->add(v); 121 base_type::on_next(std::move(v)); 122 } 123 }; 124 125 } 126 127 template<class T, class Coordination> 128 class replay 129 { 130 typedef detail::replay_traits<Coordination> traits; 131 typedef typename traits::count_type count_type; 132 typedef typename traits::period_type period_type; 133 typedef typename traits::time_point_type time_point_type; 134 135 detail::replay_observer<T, Coordination> s; 136 137 public: replay(Coordination cn,composite_subscription cs=composite_subscription ())138 explicit replay(Coordination cn, composite_subscription cs = composite_subscription()) 139 : s(count_type(), period_type(), cn, cs, composite_subscription{}) 140 { 141 } 142 replay(std::size_t count,Coordination cn,composite_subscription cs=composite_subscription ())143 replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) 144 : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{}) 145 { 146 } 147 replay(rxsc::scheduler::clock_type::duration period,Coordination cn,composite_subscription cs=composite_subscription ())148 replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) 149 : s(count_type(), period_type(period), cn, cs, composite_subscription{}) 150 { 151 } 152 replay(std::size_t count,rxsc::scheduler::clock_type::duration period,Coordination cn,composite_subscription cs=composite_subscription ())153 replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) 154 : s(count_type(count), period_type(period), cn, cs, composite_subscription{}) 155 { 156 } 157 has_observers() const158 bool has_observers() const { 159 return s.has_observers(); 160 } 161 get_values() const162 std::list<T> get_values() const { 163 return s.get_values(); 164 } 165 get_subscriber() const166 subscriber<T> get_subscriber() const { 167 return s.get_subscriber(); 168 } 169 get_observable() const170 observable<T> get_observable() const { 171 auto keepAlive = s; 172 auto observable = make_observable_dynamic<T>([=](subscriber<T> o){ 173 for (auto&& value: get_values()) { 174 o.on_next(value); 175 } 176 keepAlive.add(keepAlive.get_subscriber(), std::move(o)); 177 }); 178 return s.get_coordinator().in(observable); 179 } 180 }; 181 182 } 183 184 } 185 186 #endif 187