• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2001-2003
2 // William E. Kempf
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <iostream>
8 #include <vector>
9 #include <boost/utility.hpp>
10 #include <boost/thread/condition_variable.hpp>
11 #include <boost/thread/thread_only.hpp>
12 #include "../test/remove_error_code_unused_warning.hpp"
13 
14 class bounded_buffer : private boost::noncopyable
15 {
16 public:
17     typedef boost::unique_lock<boost::mutex> lock;
18 
bounded_buffer(int n)19     bounded_buffer(int n) : boost::noncopyable(), begin(0), end(0), buffered(0), circular_buf(n) { }
20 
send(int m)21     void send (int m) {
22         lock lk(monitor);
23         while (buffered == circular_buf.size())
24             buffer_not_full.wait(lk);
25         circular_buf[end] = m;
26         end = (end+1) % circular_buf.size();
27         ++buffered;
28         buffer_not_empty.notify_one();
29     }
receive()30     int receive() {
31         lock lk(monitor);
32         while (buffered == 0)
33             buffer_not_empty.wait(lk);
34         int i = circular_buf[begin];
35         begin = (begin+1) % circular_buf.size();
36         --buffered;
37         buffer_not_full.notify_one();
38         return i;
39     }
40 
41 private:
42     int begin, end;
43     std::vector<int>::size_type buffered;
44     std::vector<int> circular_buf;
45     boost::condition_variable_any buffer_not_full, buffer_not_empty;
46     boost::mutex monitor;
47 };
48 
49 bounded_buffer buf(2);
50 
51 boost::mutex io_mutex;
52 
sender()53 void sender() {
54     int n = 0;
55     while (n < 1000000) {
56         buf.send(n);
57         if(!(n%10000))
58         {
59             boost::unique_lock<boost::mutex> io_lock(io_mutex);
60             std::cout << "sent: " << n << std::endl;
61         }
62         ++n;
63     }
64     buf.send(-1);
65 }
66 
receiver()67 void receiver() {
68     int n;
69     do {
70         n = buf.receive();
71         if(!(n%10000))
72         {
73             boost::unique_lock<boost::mutex> io_lock(io_mutex);
74             std::cout << "received: " << n << std::endl;
75         }
76     } while (n != -1); // -1 indicates end of buffer
77     buf.send(-1);
78 }
79 
main(int,char * [])80 int main(int, char*[])
81 {
82     boost::thread thrd1(&sender);
83     boost::thread thrd2(&receiver);
84     boost::thread thrd3(&receiver);
85     boost::thread thrd4(&receiver);
86     thrd1.join();
87     thrd2.join();
88     thrd3.join();
89     thrd4.join();
90     return 0;
91 }
92