1 // (C) Copyright 2008-10 Anthony Williams 2 // (C) Copyright 2011-2015 Vicente J. Botet Escriba 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See 5 // accompanying file LICENSE_1_0.txt or copy at 6 // http://www.boost.org/LICENSE_1_0.txt) 7 8 #ifndef BOOST_THREAD_FUTURES_WAIT_FOR_ANY_HPP 9 #define BOOST_THREAD_FUTURES_WAIT_FOR_ANY_HPP 10 11 #include <boost/thread/detail/config.hpp> 12 13 #include <boost/thread/detail/move.hpp> 14 #include <boost/thread/futures/is_future_type.hpp> 15 #include <boost/thread/lock_algorithms.hpp> 16 #include <boost/thread/mutex.hpp> 17 #include <boost/thread/condition_variable.hpp> 18 19 #include <boost/core/enable_if.hpp> 20 #include <boost/next_prior.hpp> 21 #include <boost/scoped_array.hpp> 22 23 #include <iterator> 24 #include <vector> 25 26 namespace boost 27 { 28 namespace detail 29 { 30 template <class Future> 31 class waiter_for_any_in_seq 32 { 33 struct registered_waiter; 34 typedef std::vector<int>::size_type count_type; 35 36 struct registered_waiter 37 { 38 typedef Future future_type; 39 future_type* future_; 40 typedef typename Future::notify_when_ready_handle notify_when_ready_handle; 41 notify_when_ready_handle handle; 42 count_type index; 43 registered_waiterboost::detail::waiter_for_any_in_seq::registered_waiter44 registered_waiter(future_type & a_future, 45 notify_when_ready_handle handle_, count_type index_) : 46 future_(&a_future), handle(handle_), index(index_) 47 { 48 } 49 }; 50 51 struct all_futures_lock 52 { 53 #ifdef _MANAGED 54 typedef std::ptrdiff_t count_type_portable; 55 #else 56 typedef count_type count_type_portable; 57 #endif 58 count_type_portable count; 59 boost::scoped_array<boost::unique_lock<boost::mutex> > locks; 60 all_futures_lockboost::detail::waiter_for_any_in_seq::all_futures_lock61 all_futures_lock(std::vector<registered_waiter>& waiters) : 62 count(waiters.size()), locks(new boost::unique_lock<boost::mutex>[count]) 63 { 64 for (count_type_portable i = 0; i < count; ++i) 65 { 66 locks[i] = BOOST_THREAD_MAKE_RV_REF(boost::unique_lock<boost::mutex>(waiters[i].future_->mutex())); 67 } 68 } 69 lockboost::detail::waiter_for_any_in_seq::all_futures_lock70 void lock() 71 { 72 boost::lock(locks.get(), locks.get() + count); 73 } 74 unlockboost::detail::waiter_for_any_in_seq::all_futures_lock75 void unlock() 76 { 77 for (count_type_portable i = 0; i < count; ++i) 78 { 79 locks[i].unlock(); 80 } 81 } 82 }; 83 84 boost::condition_variable_any cv; 85 std::vector<registered_waiter> waiters_; 86 count_type future_count; 87 88 public: waiter_for_any_in_seq()89 waiter_for_any_in_seq() : 90 future_count(0) 91 { 92 } 93 94 template <typename F> add(F & f)95 void add(F& f) 96 { 97 if (f.valid()) 98 { 99 registered_waiter waiter(f, f.notify_when_ready(cv), future_count); 100 try 101 { 102 waiters_.push_back(waiter); 103 } 104 catch (...) 105 { 106 f.future_->unnotify_when_ready(waiter.handle); 107 throw; 108 } 109 ++future_count; 110 } 111 } 112 113 #ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES 114 template <typename F1, typename ... Fs> add(F1 & f1,Fs &...fs)115 void add(F1& f1, Fs&... fs) 116 { 117 add(f1); 118 add(fs...); 119 } 120 #endif 121 wait()122 count_type wait() 123 { 124 all_futures_lock lk(waiters_); 125 for (;;) 126 { 127 for (count_type i = 0; i < waiters_.size(); ++i) 128 { 129 if (waiters_[i].future_->is_ready(lk.locks[i])) 130 { 131 return waiters_[i].index; 132 } 133 } 134 cv.wait(lk); 135 } 136 } 137 ~waiter_for_any_in_seq()138 ~waiter_for_any_in_seq() 139 { 140 for (count_type i = 0; i < waiters_.size(); ++i) 141 { 142 waiters_[i].future_->unnotify_when_ready(waiters_[i].handle); 143 } 144 } 145 }; 146 } 147 148 template <typename Iterator> wait_for_any(Iterator begin,Iterator end)149 typename boost::disable_if<is_future_type<Iterator> , Iterator>::type wait_for_any(Iterator begin, Iterator end) 150 { 151 if (begin == end) return end; 152 153 detail::waiter_for_any_in_seq<typename std::iterator_traits<Iterator>::value_type> waiter; 154 for (Iterator current = begin; current != end; ++current) 155 { 156 waiter.add(*current); 157 } 158 return boost::next(begin, waiter.wait()); 159 } 160 } 161 162 #endif // header 163