1 // (C) Copyright 2012 Howard Hinnant
2 // (C) Copyright 2012 Vicente Botet
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6
7 // adapted from the example given by Howard Hinnant in
8
9 #include <boost/config.hpp>
10
11 #define BOOST_THREAD_VERSION 4
12 #define BOOST_THREAD_QUEUE_DEPRECATE_OLD
13 #if ! defined BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16 #include <iostream>
17 #include <boost/thread/scoped_thread.hpp>
18 #ifdef XXXX
19 #include <boost/thread/externally_locked_stream.hpp>
20 typedef boost::externally_locked_stream<std::ostream> the_ostream;
21 #else
22 typedef std::ostream the_ostream;
23 typedef std::istream the_istream;
24 #endif
25 #include <boost/thread/concurrent_queues/sync_queue.hpp>
26
producer(the_ostream &,boost::sync_queue<int> & sbq)27 void producer(the_ostream & /*mos*/, boost::sync_queue<int> & sbq)
28 {
29 using namespace boost;
30 try {
31 for(int i=0; ;++i)
32 {
33 sbq.push(i);
34 //sbq << i;
35 //mos << "push(" << i << ") "<< sbq.size()<<"\n";
36 this_thread::sleep_for(chrono::milliseconds(200));
37 }
38 }
39 catch(sync_queue_is_closed&)
40 {
41 //mos << "closed !!!\n";
42 }
43 catch(...)
44 {
45 //mos << "exception !!!\n";
46 }
47 }
48
consumer(the_ostream &,boost::sync_queue<int> & sbq)49 void consumer(
50 the_ostream & /*mos*/,
51 boost::sync_queue<int> & sbq)
52 {
53 using namespace boost;
54 try {
55 for(int i=0; ;++i)
56 {
57 int r;
58 sbq.pull(r);
59 //sbq >> r;
60 //mos << i << " pull(" << r << ") "<< sbq.size()<<"\n";
61
62 this_thread::sleep_for(chrono::milliseconds(250));
63 }
64 }
65 catch(sync_queue_is_closed&)
66 {
67 //mos << "closed !!!\n";
68 }
69 catch(...)
70 {
71 //mos << "exception !!!\n";
72 }
73 }
consumer2(the_ostream &,boost::sync_queue<int> & sbq)74 void consumer2(the_ostream &/*mos*/, boost::sync_queue<int> & sbq)
75 {
76 using namespace boost;
77 try {
78 for(int i=0; ;++i)
79 {
80 int r;
81 queue_op_status st = sbq.try_pull(r);
82 if (queue_op_status::closed == st) break;
83 if (queue_op_status::success == st) {
84 //mos << i << " pull(" << r << ")\n";
85 }
86 this_thread::sleep_for(chrono::milliseconds(250));
87 }
88 }
89 catch(...)
90 {
91 //mos << "exception !!!\n";
92 }
93 }
consumer3(the_ostream &,boost::sync_queue<int> & sbq)94 void consumer3(the_ostream &/*mos*/, boost::sync_queue<int> & sbq)
95 {
96 using namespace boost;
97 try {
98 for(int i=0; ;++i)
99 {
100 int r;
101 queue_op_status res = sbq.wait_pull(r);
102 if (res==queue_op_status::closed) break;
103 //mos << i << " wait_pull(" << r << ")\n";
104 this_thread::sleep_for(chrono::milliseconds(250));
105 }
106 }
107 catch(...)
108 {
109 //mos << "exception !!!\n";
110 }
111 }
112
main()113 int main()
114 {
115 using namespace boost;
116
117 #ifdef XXXX
118 recursive_mutex terminal_mutex;
119
120 externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
121 externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
122 externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
123 #else
124 the_ostream &mcerr = std::cout;
125 the_ostream &mcout = std::cerr;
126 //the_istream &mcin = std::cin;
127 #endif
128
129 sync_queue<int> sbq;
130
131 {
132 mcout << "begin of main" << std::endl;
133 scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
134 scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
135 scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), boost::ref(sbq)));
136
137 this_thread::sleep_for(chrono::seconds(1));
138
139 mcout << "closed()" << std::endl;
140 sbq.close();
141 mcout << "closed()" << std::endl;
142
143 } // all threads joined here.
144 mcout << "end of main" << std::endl;
145 return 0;
146 }
147
148