1// (C) Copyright 2014 Vicente Botet
2//
3// Distributed under the Boost Software License, Version 1.0. (See accompanying
4// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6// adapted from the example given by Howard Hinnant in
7
8#include <boost/config.hpp>
9
10#define BOOST_THREAD_VERSION 4
11//#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
12#if ! defined BOOST_NO_CXX11_DECLTYPE
13#define BOOST_RESULT_OF_USE_DECLTYPE
14#endif
15#include <iostream>
16#include <boost/thread/scoped_thread.hpp>
17#ifdef XXXX
18#include <boost/thread/externally_locked_stream.hpp>
19 typedef boost::externally_locked_stream<std::ostream> the_ostream;
20#else
21 typedef std::ostream the_ostream;
22 typedef std::istream the_istream;
23#endif
24#include <boost/thread/concurrent_queues/sync_queue.hpp>
25#include <boost/thread/concurrent_queues/queue_adaptor.hpp>
26#include <boost/thread/concurrent_queues/queue_views.hpp>
27#include <boost/static_assert.hpp>
28#include <boost/type_traits.hpp>
29
30void producer(the_ostream &/*mos*/, boost::queue_back<int> sbq)
31{
32 using namespace boost;
33 try {
34 for(int i=0; ;++i)
35 {
36 sbq.push(x: i);
37 //sbq << i;
38 //mos << "push(" << i << ") " << sbq.size() <<"\n";
39 this_thread::sleep_for(d: chrono::milliseconds(200));
40 }
41 }
42 catch(sync_queue_is_closed&)
43 {
44 //mos << "closed !!!\n";
45 }
46 catch(...)
47 {
48 //mos << "exception !!!\n";
49 }
50}
51
52void consumer(
53 the_ostream &/*mos*/,
54 boost::queue_front<int> sbq)
55{
56 using namespace boost;
57 try {
58 for(int i=0; ;++i)
59 {
60 int r;
61 sbq.pull(x&: r);
62 //sbq >> r;
63 //mos << i << " pull(" << r << ") " << sbq.size() <<"\n";
64
65 this_thread::sleep_for(d: chrono::milliseconds(250));
66 }
67 }
68 catch(sync_queue_is_closed&)
69 {
70 //mos << "closed !!!\n";
71 }
72 catch(...)
73 {
74 //mos << "exception !!!\n";
75 }
76}
77void consumer2(the_ostream &/*mos*/, boost::queue_front<int> sbq)
78{
79 using namespace boost;
80 try {
81 for(int i=0; ;++i)
82 {
83 int r;
84 queue_op_status st = sbq.try_pull(x&: r);
85 if (queue_op_status::closed == st) break;
86 if (queue_op_status::success == st) {
87 //mos << i << " try_pull(" << r << ")\n";
88 }
89 this_thread::sleep_for(d: chrono::milliseconds(250));
90 }
91 }
92 catch(...)
93 {
94 //mos << "exception !!!\n";
95 }
96}
97void consumer3(the_ostream &/*mos*/, boost::queue_front<int> sbq)
98{
99 using namespace boost;
100 try {
101 for(int i=0; ;++i)
102 {
103 int r;
104 queue_op_status res = sbq.wait_pull(x&: r);
105 if (res==queue_op_status::closed) break;
106 //mos << i << " wait_pull(" << r << ")\n";
107 this_thread::sleep_for(d: chrono::milliseconds(250));
108 }
109 }
110 catch(...)
111 {
112 //mos << "exception !!!\n";
113 }
114}
115
116int main()
117{
118 using namespace boost;
119
120#ifdef XXXX
121 recursive_mutex terminal_mutex;
122
123 externally_locked_stream<std::ostream> mcerr(std::cerr, terminal_mutex);
124 externally_locked_stream<std::ostream> mcout(std::cout, terminal_mutex);
125 externally_locked_stream<std::istream> mcin(std::cin, terminal_mutex);
126#else
127 the_ostream &mcerr = std::cout;
128 the_ostream &mcout = std::cerr;
129 //the_istream &mcin = std::cin;
130#endif
131
132 queue_adaptor<sync_queue<int> > sbq;
133
134 {
135 mcout << "begin of main" << std::endl;
136 scoped_thread<> t11(boost::thread(producer, boost::ref(t&: mcerr), concurrent::queue_back<int>(sbq)));
137 scoped_thread<> t12(boost::thread(producer, boost::ref(t&: mcerr), concurrent::queue_back<int>(sbq)));
138 scoped_thread<> t2(boost::thread(consumer, boost::ref(t&: mcout), concurrent::queue_front<int>(sbq)));
139
140 this_thread::sleep_for(d: chrono::seconds(1));
141
142 mcout << "closed()" << std::endl;
143 sbq.close();
144 mcout << "closed()" << std::endl;
145
146 } // all threads joined here.
147 mcout << "end of main" << std::endl;
148 return 0;
149}
150
151

source code of boost/libs/thread/example/producer_consumer2.cpp