1//////////////////////////////////////////////////////////////////////////////
2//
3// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
4// Software License, Version 1.0. (See accompanying file
5// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// See http://www.boost.org/libs/interprocess for documentation.
8//
9//////////////////////////////////////////////////////////////////////////////
10
11#include <boost/interprocess/ipc/message_queue.hpp>
12#include <boost/interprocess/managed_external_buffer.hpp>
13#include <boost/interprocess/managed_heap_memory.hpp>
14#include <boost/interprocess/containers/map.hpp>
15#include <boost/interprocess/containers/set.hpp>
16#include <boost/interprocess/allocators/node_allocator.hpp>
17#include <boost/interprocess/detail/os_thread_functions.hpp>
18// intrusive/detail
19#include <boost/intrusive/detail/minimal_pair_header.hpp>
20#include <boost/intrusive/detail/minimal_less_equal_header.hpp>
21
22#include <boost/move/unique_ptr.hpp>
23
24#include <cstddef>
25#include <memory>
26#include <iostream>
27#include <vector>
28#include <exception>
29#include <limits>
30
31#include "get_process_id_name.hpp"
32#include "named_creation_template.hpp"
33
34////////////////////////////////////////////////////////////////////////////////
35// //
36// This example tests the process shared message queue. //
37// //
38////////////////////////////////////////////////////////////////////////////////
39
40using namespace boost::interprocess;
41
42//This test inserts messages with different priority and marks them with a
43//time-stamp to check if receiver obtains highest priority messages first and
44//messages with same priority are received in fifo order
45bool test_priority_order()
46{
47 message_queue::remove(name: test::get_process_id_name());
48 {
49 message_queue mq1
50 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
51 mq2
52 (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
53
54 //We test that the queue is ordered by priority and in the
55 //same priority, is a FIFO
56 message_queue::size_type recvd = 0;
57 unsigned int priority = 0;
58 std::size_t tstamp;
59 unsigned int priority_prev;
60 std::size_t tstamp_prev;
61
62 //We will send 100 message with priority 0-9
63 //The message will contain the timestamp of the message
64 for(std::size_t i = 0; i < 100; ++i){
65 tstamp = i;
66 mq1.send(buffer: &tstamp, buffer_size: sizeof(tstamp), priority: (unsigned int)(i%10));
67 }
68
69 priority_prev = (std::numeric_limits<unsigned int>::max)();
70 tstamp_prev = 0;
71
72 //Receive all messages and test those are ordered
73 //by priority and by FIFO in the same priority
74 for(std::size_t i = 0; i < 100; ++i){
75 mq1.receive(buffer: &tstamp, buffer_size: sizeof(tstamp), recvd_size&: recvd, priority);
76 if(priority > priority_prev)
77 return false;
78 if(priority == priority_prev &&
79 tstamp <= tstamp_prev){
80 return false;
81 }
82 priority_prev = priority;
83 tstamp_prev = tstamp;
84 }
85
86 //Now retry it with different priority order
87 for(std::size_t i = 0; i < 100; ++i){
88 tstamp = i;
89 mq1.send(buffer: &tstamp, buffer_size: sizeof(tstamp), priority: (unsigned int)(9 - i%10));
90 }
91
92 priority_prev = (std::numeric_limits<unsigned int>::max)();
93 tstamp_prev = 0;
94
95 //Receive all messages and test those are ordered
96 //by priority and by FIFO in the same priority
97 for(std::size_t i = 0; i < 100; ++i){
98 mq1.receive(buffer: &tstamp, buffer_size: sizeof(tstamp), recvd_size&: recvd, priority);
99 if(priority > priority_prev)
100 return false;
101 if(priority == priority_prev &&
102 tstamp <= tstamp_prev){
103 return false;
104 }
105 priority_prev = priority;
106 tstamp_prev = tstamp;
107 }
108 }
109 message_queue::remove(name: test::get_process_id_name());
110 return true;
111}
112
113//[message_queue_test_test_serialize_db
114//This test creates a in memory data-base using Interprocess machinery and
115//serializes it through a message queue. Then rebuilds the data-base in
116//another buffer and checks it against the original data-base
117bool test_serialize_db()
118{
119 //Typedef data to create a Interprocess map
120 typedef std::pair<const std::size_t, std::size_t> MyPair;
121 typedef std::less<std::size_t> MyLess;
122 typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
123 node_allocator_t;
124 typedef map<std::size_t,
125 std::size_t,
126 std::less<std::size_t>,
127 node_allocator_t>
128 MyMap;
129
130 //Some constants
131 const std::size_t BufferSize = 65536;
132 const std::size_t MaxMsgSize = 100;
133
134 //Allocate a memory buffer to hold the destiny database using vector<char>
135 std::vector<char> buffer_destiny(BufferSize, 0);
136
137 message_queue::remove(name: test::get_process_id_name());
138 {
139 //Create the message-queues
140 message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
141
142 //Open previously created message-queue simulating other process
143 message_queue mq2(open_only, test::get_process_id_name());
144
145 //A managed heap memory to create the origin database
146 managed_heap_memory db_origin(buffer_destiny.size());
147
148 //Construct the map in the first buffer
149 MyMap *map1 = db_origin.construct<MyMap>(name: "MyMap")
150 (MyLess(),
151 db_origin.get_segment_manager());
152 if(!map1)
153 return false;
154
155 //Fill map1 until is full
156 BOOST_TRY{
157 std::size_t i = 0;
158 while(1){
159 (*map1)[i] = i;
160 ++i;
161 }
162 }
163 BOOST_CATCH(boost::interprocess::bad_alloc &){} BOOST_CATCH_END
164
165 //Data control data sending through the message queue
166 std::size_t sent = 0;
167 message_queue::size_type recvd = 0;
168 message_queue::size_type total_recvd = 0;
169 unsigned int priority;
170
171 //Send whole first buffer through the mq1, read it
172 //through mq2 to the second buffer
173 while(1){
174 //Send a fragment of buffer1 through mq1
175 std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
176 MaxMsgSize : (db_origin.get_size() - sent);
177 mq1.send( buffer: &static_cast<char*>(db_origin.get_address())[sent]
178 , buffer_size: bytes_to_send
179 , priority: 0);
180 sent += bytes_to_send;
181 //Receive the fragment through mq2 to buffer_destiny
182 mq2.receive( buffer: &buffer_destiny[total_recvd]
183 , buffer_size: BufferSize - recvd
184 , recvd_size&: recvd
185 , priority);
186 total_recvd += recvd;
187
188 //Check if we have received all the buffer
189 if(total_recvd == BufferSize){
190 break;
191 }
192 }
193
194 //The buffer will contain a copy of the original database
195 //so let's interpret the buffer with managed_external_buffer
196 managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
197
198 //Let's find the map
199 std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>(name: "MyMap");
200 MyMap *map2 = ret.first;
201
202 //Check if we have found it
203 if(!map2){
204 return false;
205 }
206
207 //Check if it is a single variable (not an array)
208 if(ret.second != 1){
209 return false;
210 }
211
212 //Now let's compare size
213 if(map1->size() != map2->size()){
214 return false;
215 }
216
217 //Now let's compare all db values
218 MyMap::size_type num_elements = map1->size();
219 for(std::size_t i = 0; i < num_elements; ++i){
220 if((*map1)[i] != (*map2)[i]){
221 return false;
222 }
223 }
224
225 //Destroy maps from db-s
226 db_origin.destroy_ptr(ptr: map1);
227 db_destiny.destroy_ptr(ptr: map2);
228 }
229 message_queue::remove(name: test::get_process_id_name());
230 return true;
231}
232//]
233
234static const int MsgSize = 10;
235static const int NumMsg = 1000;
236static char msgsend [10];
237static char msgrecv [10];
238
239static boost::interprocess::message_queue *pmessage_queue;
240
241void receiver()
242{
243 boost::interprocess::message_queue::size_type recvd_size;
244 unsigned int priority;
245 int nummsg = NumMsg;
246
247 while(nummsg--){
248 pmessage_queue->receive(buffer: msgrecv, buffer_size: MsgSize, recvd_size, priority);
249 }
250}
251
252bool test_buffer_overflow()
253{
254 boost::interprocess::message_queue::remove(name: test::get_process_id_name());
255 {
256 boost::movelib::unique_ptr<boost::interprocess::message_queue>
257 ptr(new boost::interprocess::message_queue
258 (create_only, test::get_process_id_name(), 10, 10));
259 pmessage_queue = ptr.get();
260
261 //Launch the receiver thread
262 boost::interprocess::ipcdetail::OS_thread_t thread;
263 boost::interprocess::ipcdetail::thread_launch(pt&: thread, f: &receiver);
264 boost::interprocess::ipcdetail::thread_yield();
265
266 int nummsg = NumMsg;
267
268 while(nummsg--){
269 pmessage_queue->send(buffer: msgsend, buffer_size: MsgSize, priority: 0);
270 }
271
272 boost::interprocess::ipcdetail::thread_join(thread);
273 }
274 boost::interprocess::message_queue::remove(name: test::get_process_id_name());
275 return true;
276}
277
278
279//////////////////////////////////////////////////////////////////////////////
280//
281// test_multi_sender_receiver is based on Alexander (aalutov's)
282// testcase for ticket #9221. Many thanks.
283//
284//////////////////////////////////////////////////////////////////////////////
285
286static boost::interprocess::message_queue *global_queue = 0;
287//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
288static const int MULTI_NUM_MSG_PER_SENDER = 10000;
289//Message queue message capacity
290static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
291//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
292static const std::size_t MULTI_THREAD_COUNT = 10;
293
294static void multisend()
295{
296 char buff;
297 for (std::size_t i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
298 global_queue->send(buffer: &buff, buffer_size: 1, priority: 0);
299 }
300 global_queue->send(buffer: &buff, buffer_size: 0, priority: 0);
301 //std::cout<<"writer thread complete"<<std::endl;
302}
303
304static void multireceive()
305{
306 char buff;
307 size_t size;
308 int received_msgs = 0;
309 unsigned int priority;
310 do {
311 global_queue->receive(buffer: &buff, buffer_size: 1, recvd_size&: size, priority);
312 ++received_msgs;
313 } while (size > 0);
314 --received_msgs;
315 //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
316}
317
318
319bool test_multi_sender_receiver()
320{
321 bool ret = true;
322 //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
323 BOOST_TRY {
324 boost::interprocess::message_queue::remove(name: test::get_process_id_name());
325 boost::interprocess::message_queue mq
326 (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
327 global_queue = &mq;
328 std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
329
330 //Launch senders receiver thread
331 for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) {
332 boost::interprocess::ipcdetail::thread_launch
333 (pt&: threads[i], f: &multisend);
334 }
335
336 for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) {
337 boost::interprocess::ipcdetail::thread_launch
338 (pt&: threads[MULTI_THREAD_COUNT+i], f: &multireceive);
339 }
340
341 for (std::size_t i = 0; i < MULTI_THREAD_COUNT*2; i++) {
342 boost::interprocess::ipcdetail::thread_join(thread: threads[i]);
343 //std::cout << "Joined thread " << i << std::endl;
344 }
345 }
346 BOOST_CATCH(std::exception &e) {
347 std::cout << "error " << e.what() << std::endl;
348 ret = false;
349 } BOOST_CATCH_END
350 boost::interprocess::message_queue::remove(name: test::get_process_id_name());
351 return ret;
352}
353
354class msg_queue_named_test_wrapper
355 : public test::named_sync_deleter<message_queue>, public message_queue
356{
357 public:
358
359 msg_queue_named_test_wrapper(create_only_t)
360 : message_queue(create_only, test::get_process_id_name(), 10, 10)
361 {}
362
363 msg_queue_named_test_wrapper(open_only_t)
364 : message_queue(open_only, test::get_process_id_name())
365 {}
366
367 msg_queue_named_test_wrapper(open_or_create_t)
368 : message_queue(open_or_create, test::get_process_id_name(), 10, 10)
369 {}
370
371 ~msg_queue_named_test_wrapper()
372 {}
373};
374
375#if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
376
377class msg_queue_named_test_wrapper_w
378 : public test::named_sync_deleter_w<message_queue>, public message_queue
379{
380 public:
381
382 template <class CharT>
383 msg_queue_named_test_wrapper_w(create_only_t)
384 : message_queue(create_only, test::get_process_id_wname(), 10, 10)
385 {}
386
387 msg_queue_named_test_wrapper_w(open_only_t)
388 : message_queue(open_only, test::get_process_id_wname())
389 {}
390
391 msg_queue_named_test_wrapper_w(open_or_create_t)
392 : message_queue(open_or_create, test::get_process_id_wname(), 10, 10)
393 {}
394
395 ~msg_queue_named_test_wrapper_w()
396 {}
397};
398
399#endif //defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
400
401
402int main ()
403{
404 int ret = 0;
405 BOOST_TRY{
406 message_queue::remove(name: test::get_process_id_name());
407 test::test_named_creation<msg_queue_named_test_wrapper>();
408 #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES)
409 test::test_named_creation<msg_queue_named_test_wrapper>();
410 #endif
411
412 if(!test_priority_order()){
413 return 1;
414 }
415
416 if(!test_serialize_db()){
417 return 1;
418 }
419
420 if(!test_buffer_overflow()){
421 return 1;
422 }
423
424 if(!test_multi_sender_receiver()){
425 return 1;
426 }
427 }
428 BOOST_CATCH(std::exception &ex) {
429 std::cout << ex.what() << std::endl;
430 ret = 1;
431 } BOOST_CATCH_END
432
433 message_queue::remove(name: test::get_process_id_name());
434 return ret;
435}
436
437

source code of boost/libs/interprocess/test/message_queue_test.cpp