• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // (C) Copyright 2012 Howard Hinnant
2 // (C) Copyright 2012 Vicente Botet
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 // adapted from the example given by Howard Hinnant in
8 
9 #include <boost/config.hpp>
10 
11 #define BOOST_THREAD_VERSION 4
12 #define BOOST_THREAD_QUEUE_DEPRECATE_OLD
13 #if ! defined  BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16 //#define XXXX
17 
18 #include <iostream>
19 #include <boost/thread/scoped_thread.hpp>
20 #ifdef XXXX
21 #include <boost/thread/externally_locked_stream.hpp>
22     typedef  boost::externally_locked_stream<std::ostream> the_ostream;
23 #else
24     typedef std::ostream the_ostream;
25     typedef std::istream the_istream;
26 #endif
27 
28 #include <boost/thread/sync_bounded_queue.hpp>
29 
producer(the_ostream &,boost::sync_bounded_queue<int> & sbq)30 void producer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
31 {
32   using namespace boost;
33   try {
34     for(int i=0; ;++i)
35     {
36       sbq.push_back(i);
37       //sbq << i;
38       //mos << "push_back(" << i << ") "<< sbq.size()<<"\n";
39       this_thread::sleep_for(chrono::milliseconds(200));
40     }
41   }
42   catch(sync_queue_is_closed&)
43   {
44     //mos << "closed !!!\n";
45   }
46   catch(...)
47   {
48     //mos << "exception !!!\n";
49   }
50 }
51 
consumer(the_ostream &,boost::sync_bounded_queue<int> & sbq)52 void consumer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
53 {
54   using namespace boost;
55   try {
56     for(int i=0; ;++i)
57     {
58       int r;
59       sbq.pull_front(r);
60       //sbq >> r;
61       //mos << i << " pull_front(" << r << ") "<< sbq.size()<<"\n";
62       this_thread::sleep_for(chrono::milliseconds(250));
63     }
64   }
65   catch(sync_queue_is_closed&)
66   {
67     //mos << "closed !!!\n";
68   }
69   catch(...)
70   {
71     //mos << "exception !!!\n";
72   }
73 }
consumer2(the_ostream &,boost::sync_bounded_queue<int> & sbq)74 void consumer2(the_ostream &/*mos*/,  boost::sync_bounded_queue<int> & sbq)
75 {
76   using namespace boost;
77   try {
78     for(int i=0; ;++i)
79     {
80       int r;
81       queue_op_status st = sbq.try_pull_front(r);
82       if (queue_op_status::closed == st) break;
83       if (queue_op_status::success == st) {
84         //mos << i << " pull(" << r << ")\n";
85       }
86       this_thread::sleep_for(chrono::milliseconds(250));
87     }
88   }
89   catch(...)
90   {
91     //mos << "exception !!!\n";
92   }
93 }
94 //void consumer3(the_ostream &mos, boost::sync_bounded_queue<int> & sbq)
95 //{
96 //  using namespace boost;
97 //  bool closed=false;
98 //  try {
99 //    for(int i=0; ;++i)
100 //    {
101 //      int r;
102 //      queue_op_status res = sbq.wait_and_pull(r);
103 //      if (res==queue_op_status::closed) break;
104 //      //mos << i << " wait_and_pull(" << r << ")\n";
105 //      this_thread::sleep_for(chrono::milliseconds(250));
106 //    }
107 //  }
108 //  catch(...)
109 //  {
110 //    //mos << "exception !!!\n";
111 //  }
112 //}
113 
main()114 int main()
115 {
116   using namespace boost;
117 #ifdef XXXX
118   recursive_mutex terminal_mutex;
119 
120   externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
121   externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
122   externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
123 #else
124   the_ostream &mcerr = std::cout;
125   the_ostream &mcout = std::cerr;
126   //the_istream &mcin = std::cin;
127 #endif
128 
129   sync_bounded_queue<int> sbq(10);
130 
131   {
132     mcout << "begin of main" << std::endl;
133     scoped_thread<> t11(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
134     scoped_thread<> t12(boost::thread(producer, boost::ref(mcerr), boost::ref(sbq)));
135     scoped_thread<> t2(boost::thread(consumer, boost::ref(mcout), boost::ref(sbq)));
136 
137     this_thread::sleep_for(chrono::seconds(1));
138 
139     sbq.close();
140     mcout << "closed()" << std::endl;
141 
142   } // all threads joined here.
143   mcout << "end of main" << std::endl;
144   return 0;
145 }
146 
147