1 // (C) Copyright 2012 Howard Hinnant
2 // (C) Copyright 2012 Vicente Botet
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6
7 // adapted from the example given by Howard Hinnant in
8
9 #include <boost/config.hpp>
10
11 #define BOOST_THREAD_VERSION 4
12 #define BOOST_THREAD_QUEUE_DEPRECATE_OLD
13 #if ! defined BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16 //#define XXXX
17
18 #include <iostream>
19 #include <boost/thread/scoped_thread.hpp>
20 #ifdef XXXX
21 #include <boost/thread/externally_locked_stream.hpp>
22 typedef boost::externally_locked_stream<std::ostream> the_ostream;
23 #else
24 typedef std::ostream the_ostream;
25 typedef std::istream the_istream;
26 #endif
27
28 #include <boost/thread/sync_bounded_queue.hpp>
29
producer(the_ostream &,boost::sync_bounded_queue<int> & sbq)30 void producer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
31 {
32 using namespace boost;
33 try {
34 for(int i=0; ;++i)
35 {
36 sbq.push_back(i);
37 //sbq << i;
38 //mos << "push_back(" << i << ") "<< sbq.size()<<"\n";
39 this_thread::sleep_for(chrono::milliseconds(200));
40 }
41 }
42 catch(sync_queue_is_closed&)
43 {
44 //mos << "closed !!!\n";
45 }
46 catch(...)
47 {
48 //mos << "exception !!!\n";
49 }
50 }
51
consumer(the_ostream &,boost::sync_bounded_queue<int> & sbq)52 void consumer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
53 {
54 using namespace boost;
55 try {
56 for(int i=0; ;++i)
57 {
58 int r;
59 sbq.pull_front(r);
60 //sbq >> r;
61 //mos << i << " pull_front(" << r << ") "<< sbq.size()<<"\n";
62 this_thread::sleep_for(chrono::milliseconds(250));
63 }
64 }
65 catch(sync_queue_is_closed&)
66 {
67 //mos << "closed !!!\n";
68 }
69 catch(...)
70 {
71 //mos << "exception !!!\n";
72 }
73 }
consumer2(the_ostream &,boost::sync_bounded_queue<int> & sbq)74 void consumer2(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
75 {
76 using namespace boost;
77 try {
78 for(int i=0; ;++i)
79 {
80 int r;
81 queue_op_status st = sbq.try_pull_front(r);
82 if (queue_op_status::closed == st) break;
83 if (queue_op_status::success == st) {
84 //mos << i << " pull(" << r << ")\n";
85 }
86 this_thread::sleep_for(chrono::milliseconds(250));
87 }
88 }
89 catch(...)
90 {
91 //mos << "exception !!!\n";
92 }
93 }
94 //void consumer3(the_ostream &mos, boost::sync_bounded_queue<int> & sbq)
95 //{
96 // using namespace boost;
97 // bool closed=false;
98 // try {
99 // for(int i=0; ;++i)
100 // {
101 // int r;
102 // queue_op_status res = sbq.wait_and_pull(r);
103 // if (res==queue_op_status::closed) break;
104 // //mos << i << " wait_and_pull(" << r << ")\n";
105 // this_thread::sleep_for(chrono::milliseconds(250));
106 // }
107 // }
108 // catch(...)
109 // {
110 // //mos << "exception !!!\n";
111 // }
112 //}
113
main()114 int main()
115 {
116 using namespace boost;
117 #ifdef XXXX
118 recursive_mutex terminal_mutex;
119
120 externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
121 externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
122 externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
123 #else
124 the_ostream &mcerr = std::cout;
125 the_ostream &mcout = std::cerr;
126 //the_istream &mcin = std::cin;
127 #endif
128
129 sync_bounded_queue<int> sbq(10);
130
131 {
132 mcout << "begin of main" << std::endl;
133 scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
134 scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
135 scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), boost::ref(sbq)));
136
137 this_thread::sleep_for(chrono::seconds(1));
138
139 sbq.close();
140 mcout << "closed()" << std::endl;
141
142 } // all threads joined here.
143 mcout << "end of main" << std::endl;
144 return 0;
145 }
146
147