1// (C) Copyright 2012 Howard Hinnant
2// (C) Copyright 2012 Vicente Botet
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6
7// adapted from the example given by Howard Hinnant in
8
9#include <boost/config.hpp>
10
11#define BOOST_THREAD_VERSION 4
12#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
13#if ! defined BOOST_NO_CXX11_DECLTYPE
14#define BOOST_RESULT_OF_USE_DECLTYPE
15#endif
16//#define XXXX
17
18#include <iostream>
19#include <boost/thread/scoped_thread.hpp>
20#ifdef XXXX
21#include <boost/thread/externally_locked_stream.hpp>
22 typedef boost::externally_locked_stream<std::ostream> the_ostream;
23#else
24 typedef std::ostream the_ostream;
25 typedef std::istream the_istream;
26#endif
27
28#include <boost/thread/sync_bounded_queue.hpp>
29
30void producer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
31{
32 using namespace boost;
33 try {
34 for(int i=0; ;++i)
35 {
36 sbq.push_back(elem: i);
37 //sbq << i;
38 //mos << "push_back(" << i << ") "<< sbq.size()<<"\n";
39 this_thread::sleep_for(d: chrono::milliseconds(200));
40 }
41 }
42 catch(sync_queue_is_closed&)
43 {
44 //mos << "closed !!!\n";
45 }
46 catch(...)
47 {
48 //mos << "exception !!!\n";
49 }
50}
51
52void consumer(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
53{
54 using namespace boost;
55 try {
56 for(int i=0; ;++i)
57 {
58 int r;
59 sbq.pull_front(elem&: r);
60 //sbq >> r;
61 //mos << i << " pull_front(" << r << ") "<< sbq.size()<<"\n";
62 this_thread::sleep_for(d: chrono::milliseconds(250));
63 }
64 }
65 catch(sync_queue_is_closed&)
66 {
67 //mos << "closed !!!\n";
68 }
69 catch(...)
70 {
71 //mos << "exception !!!\n";
72 }
73}
74void consumer2(the_ostream &/*mos*/, boost::sync_bounded_queue<int> & sbq)
75{
76 using namespace boost;
77 try {
78 for(int i=0; ;++i)
79 {
80 int r;
81 queue_op_status st = sbq.try_pull_front(elem&: r);
82 if (queue_op_status::closed == st) break;
83 if (queue_op_status::success == st) {
84 //mos << i << " pull(" << r << ")\n";
85 }
86 this_thread::sleep_for(d: chrono::milliseconds(250));
87 }
88 }
89 catch(...)
90 {
91 //mos << "exception !!!\n";
92 }
93}
94//void consumer3(the_ostream &mos, boost::sync_bounded_queue<int> & sbq)
95//{
96// using namespace boost;
97// bool closed=false;
98// try {
99// for(int i=0; ;++i)
100// {
101// int r;
102// queue_op_status res = sbq.wait_and_pull(r);
103// if (res==queue_op_status::closed) break;
104// //mos << i << " wait_and_pull(" << r << ")\n";
105// this_thread::sleep_for(chrono::milliseconds(250));
106// }
107// }
108// catch(...)
109// {
110// //mos << "exception !!!\n";
111// }
112//}
113
114int main()
115{
116 using namespace boost;
117#ifdef XXXX
118 recursive_mutex terminal_mutex;
119
120 externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
121 externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
122 externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
123#else
124 the_ostream &mcerr = std::cout;
125 the_ostream &mcout = std::cerr;
126 //the_istream &mcin = std::cin;
127#endif
128
129 sync_bounded_queue<int> sbq(10);
130
131 {
132 mcout << "begin of main" << std::endl;
133 scoped_thread<> t11(boost::thread(producer, boost::ref(t&: mcerr), boost::ref(t&: sbq)));
134 scoped_thread<> t12(boost::thread(producer, boost::ref(t&: mcerr), boost::ref(t&: sbq)));
135 scoped_thread<> t2(boost::thread(consumer, boost::ref(t&: mcout), boost::ref(t&: sbq)));
136
137 this_thread::sleep_for(d: chrono::seconds(1));
138
139 sbq.close();
140 mcout << "closed()" << std::endl;
141
142 } // all threads joined here.
143 mcout << "end of main" << std::endl;
144 return 0;
145}
146
147

source code of boost/libs/thread/example/producer_consumer_bounded.cpp