1 // Copyright (C) 2001-2003 2 // William E. Kempf 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #include <iostream> 8 #include <vector> 9 #include <boost/utility.hpp> 10 #include <boost/thread/condition_variable.hpp> 11 #include <boost/thread/thread_only.hpp> 12 #include "../test/remove_error_code_unused_warning.hpp" 13 14 class bounded_buffer : private boost::noncopyable 15 { 16 public: 17 typedef boost::unique_lock<boost::mutex> lock; 18 bounded_buffer(int n)19 bounded_buffer(int n) : boost::noncopyable(), begin(0), end(0), buffered(0), circular_buf(n) { } 20 send(int m)21 void send (int m) { 22 lock lk(monitor); 23 while (buffered == circular_buf.size()) 24 buffer_not_full.wait(lk); 25 circular_buf[end] = m; 26 end = (end+1) % circular_buf.size(); 27 ++buffered; 28 buffer_not_empty.notify_one(); 29 } receive()30 int receive() { 31 lock lk(monitor); 32 while (buffered == 0) 33 buffer_not_empty.wait(lk); 34 int i = circular_buf[begin]; 35 begin = (begin+1) % circular_buf.size(); 36 --buffered; 37 buffer_not_full.notify_one(); 38 return i; 39 } 40 41 private: 42 int begin, end; 43 std::vector<int>::size_type buffered; 44 std::vector<int> circular_buf; 45 boost::condition_variable_any buffer_not_full, buffer_not_empty; 46 boost::mutex monitor; 47 }; 48 49 bounded_buffer buf(2); 50 51 boost::mutex io_mutex; 52 sender()53void sender() { 54 int n = 0; 55 while (n < 1000000) { 56 buf.send(n); 57 if(!(n%10000)) 58 { 59 boost::unique_lock<boost::mutex> io_lock(io_mutex); 60 std::cout << "sent: " << n << std::endl; 61 } 62 ++n; 63 } 64 buf.send(-1); 65 } 66 receiver()67void receiver() { 68 int n; 69 do { 70 n = buf.receive(); 71 if(!(n%10000)) 72 { 73 boost::unique_lock<boost::mutex> io_lock(io_mutex); 74 std::cout << "received: " << n << std::endl; 75 } 76 } while (n != -1); // -1 indicates end of buffer 77 buf.send(-1); 78 } 79 main(int,char * [])80int main(int, char*[]) 81 { 82 boost::thread thrd1(&sender); 83 boost::thread thrd2(&receiver); 84 boost::thread thrd3(&receiver); 85 boost::thread thrd4(&receiver); 86 thrd1.join(); 87 thrd2.join(); 88 thrd3.join(); 89 thrd4.join(); 90 return 0; 91 } 92