• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // (C) Copyright 2014 Vicente Botet
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // adapted from the example given by Howard Hinnant in
7 
8 #include <boost/config.hpp>
9 
10 #define BOOST_THREAD_VERSION 4
11 //#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
12 #if ! defined  BOOST_NO_CXX11_DECLTYPE
13 #define BOOST_RESULT_OF_USE_DECLTYPE
14 #endif
15 #include <iostream>
16 #include <boost/thread/scoped_thread.hpp>
17 #ifdef XXXX
18 #include <boost/thread/externally_locked_stream.hpp>
19     typedef  boost::externally_locked_stream<std::ostream> the_ostream;
20 #else
21     typedef std::ostream the_ostream;
22     typedef std::istream the_istream;
23 #endif
24 #include <boost/thread/concurrent_queues/sync_queue.hpp>
25 #include <boost/thread/concurrent_queues/queue_adaptor.hpp>
26 #include <boost/thread/concurrent_queues/queue_views.hpp>
27 #include <boost/static_assert.hpp>
28 #include <boost/type_traits.hpp>
29 
producer(the_ostream &,boost::queue_back<int> sbq)30 void producer(the_ostream &/*mos*/, boost::queue_back<int> sbq)
31 {
32   using namespace boost;
33   try {
34     for(int i=0; ;++i)
35     {
36       sbq.push(i);
37       //sbq << i;
38       //mos << "push(" << i << ") " << sbq.size() <<"\n";
39       this_thread::sleep_for(chrono::milliseconds(200));
40     }
41   }
42   catch(sync_queue_is_closed&)
43   {
44     //mos << "closed !!!\n";
45   }
46   catch(...)
47   {
48     //mos << "exception !!!\n";
49   }
50 }
51 
consumer(the_ostream &,boost::queue_front<int> sbq)52 void consumer(
53     the_ostream &/*mos*/,
54     boost::queue_front<int> sbq)
55 {
56   using namespace boost;
57   try {
58     for(int i=0; ;++i)
59     {
60       int r;
61       sbq.pull(r);
62       //sbq >> r;
63       //mos << i << " pull(" << r << ") " << sbq.size()  <<"\n";
64 
65       this_thread::sleep_for(chrono::milliseconds(250));
66     }
67   }
68   catch(sync_queue_is_closed&)
69   {
70     //mos << "closed !!!\n";
71   }
72   catch(...)
73   {
74     //mos << "exception !!!\n";
75   }
76 }
consumer2(the_ostream &,boost::queue_front<int> sbq)77 void consumer2(the_ostream &/*mos*/, boost::queue_front<int> sbq)
78 {
79   using namespace boost;
80   try {
81     for(int i=0; ;++i)
82     {
83       int r;
84       queue_op_status st = sbq.try_pull(r);
85       if (queue_op_status::closed == st) break;
86       if (queue_op_status::success == st) {
87         //mos << i << " try_pull(" << r << ")\n";
88       }
89       this_thread::sleep_for(chrono::milliseconds(250));
90     }
91   }
92   catch(...)
93   {
94     //mos << "exception !!!\n";
95   }
96 }
consumer3(the_ostream &,boost::queue_front<int> sbq)97 void consumer3(the_ostream &/*mos*/, boost::queue_front<int> sbq)
98 {
99   using namespace boost;
100   try {
101     for(int i=0; ;++i)
102     {
103       int r;
104       queue_op_status res = sbq.wait_pull(r);
105       if (res==queue_op_status::closed) break;
106       //mos << i << " wait_pull(" << r << ")\n";
107       this_thread::sleep_for(chrono::milliseconds(250));
108     }
109   }
110   catch(...)
111   {
112     //mos << "exception !!!\n";
113   }
114 }
115 
main()116 int main()
117 {
118   using namespace boost;
119 
120 #ifdef XXXX
121   recursive_mutex terminal_mutex;
122 
123   externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
124   externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
125   externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
126 #else
127   the_ostream &mcerr = std::cout;
128   the_ostream &mcout = std::cerr;
129   //the_istream &mcin = std::cin;
130 #endif
131 
132   queue_adaptor<sync_queue<int> > sbq;
133 
134   {
135     mcout << "begin of main" << std::endl;
136     scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back<int>(sbq)));
137     scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), concurrent::queue_back<int>(sbq)));
138     scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), concurrent::queue_front<int>(sbq)));
139 
140     this_thread::sleep_for(chrono::seconds(1));
141 
142     mcout << "closed()" << std::endl;
143     sbq.close();
144     mcout << "closed()" << std::endl;
145 
146   } // all threads joined here.
147   mcout << "end of main" << std::endl;
148   return 0;
149 }
150 
151