1 | ////////////////////////////////////////////////////////////////////////////// |
2 | // |
3 | // (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost |
4 | // Software License, Version 1.0. (See accompanying file |
5 | // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) |
6 | // |
7 | // See http://www.boost.org/libs/interprocess for documentation. |
8 | // |
9 | ////////////////////////////////////////////////////////////////////////////// |
10 | |
11 | #include <boost/interprocess/ipc/message_queue.hpp> |
12 | #include <boost/interprocess/managed_external_buffer.hpp> |
13 | #include <boost/interprocess/managed_heap_memory.hpp> |
14 | #include <boost/interprocess/containers/map.hpp> |
15 | #include <boost/interprocess/containers/set.hpp> |
16 | #include <boost/interprocess/allocators/node_allocator.hpp> |
17 | #include <boost/interprocess/detail/os_thread_functions.hpp> |
18 | // intrusive/detail |
19 | #include <boost/intrusive/detail/minimal_pair_header.hpp> |
20 | #include <boost/intrusive/detail/minimal_less_equal_header.hpp> |
21 | |
22 | #include <boost/move/unique_ptr.hpp> |
23 | |
24 | #include <cstddef> |
25 | #include <memory> |
26 | #include <iostream> |
27 | #include <vector> |
28 | #include <exception> |
29 | #include <limits> |
30 | |
31 | #include "get_process_id_name.hpp" |
32 | #include "named_creation_template.hpp" |
33 | |
34 | //////////////////////////////////////////////////////////////////////////////// |
35 | // // |
36 | // This example tests the process shared message queue. // |
37 | // // |
38 | //////////////////////////////////////////////////////////////////////////////// |
39 | |
40 | using namespace boost::interprocess; |
41 | |
42 | //This test inserts messages with different priority and marks them with a |
43 | //time-stamp to check if receiver obtains highest priority messages first and |
44 | //messages with same priority are received in fifo order |
45 | bool test_priority_order() |
46 | { |
47 | message_queue::remove(name: test::get_process_id_name()); |
48 | { |
49 | message_queue mq1 |
50 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)), |
51 | mq2 |
52 | (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)); |
53 | |
54 | //We test that the queue is ordered by priority and in the |
55 | //same priority, is a FIFO |
56 | message_queue::size_type recvd = 0; |
57 | unsigned int priority = 0; |
58 | std::size_t tstamp; |
59 | unsigned int priority_prev; |
60 | std::size_t tstamp_prev; |
61 | |
62 | //We will send 100 message with priority 0-9 |
63 | //The message will contain the timestamp of the message |
64 | for(std::size_t i = 0; i < 100; ++i){ |
65 | tstamp = i; |
66 | mq1.send(buffer: &tstamp, buffer_size: sizeof(tstamp), priority: (unsigned int)(i%10)); |
67 | } |
68 | |
69 | priority_prev = (std::numeric_limits<unsigned int>::max)(); |
70 | tstamp_prev = 0; |
71 | |
72 | //Receive all messages and test those are ordered |
73 | //by priority and by FIFO in the same priority |
74 | for(std::size_t i = 0; i < 100; ++i){ |
75 | mq1.receive(buffer: &tstamp, buffer_size: sizeof(tstamp), recvd_size&: recvd, priority); |
76 | if(priority > priority_prev) |
77 | return false; |
78 | if(priority == priority_prev && |
79 | tstamp <= tstamp_prev){ |
80 | return false; |
81 | } |
82 | priority_prev = priority; |
83 | tstamp_prev = tstamp; |
84 | } |
85 | |
86 | //Now retry it with different priority order |
87 | for(std::size_t i = 0; i < 100; ++i){ |
88 | tstamp = i; |
89 | mq1.send(buffer: &tstamp, buffer_size: sizeof(tstamp), priority: (unsigned int)(9 - i%10)); |
90 | } |
91 | |
92 | priority_prev = (std::numeric_limits<unsigned int>::max)(); |
93 | tstamp_prev = 0; |
94 | |
95 | //Receive all messages and test those are ordered |
96 | //by priority and by FIFO in the same priority |
97 | for(std::size_t i = 0; i < 100; ++i){ |
98 | mq1.receive(buffer: &tstamp, buffer_size: sizeof(tstamp), recvd_size&: recvd, priority); |
99 | if(priority > priority_prev) |
100 | return false; |
101 | if(priority == priority_prev && |
102 | tstamp <= tstamp_prev){ |
103 | return false; |
104 | } |
105 | priority_prev = priority; |
106 | tstamp_prev = tstamp; |
107 | } |
108 | } |
109 | message_queue::remove(name: test::get_process_id_name()); |
110 | return true; |
111 | } |
112 | |
113 | //[message_queue_test_test_serialize_db |
114 | //This test creates a in memory data-base using Interprocess machinery and |
115 | //serializes it through a message queue. Then rebuilds the data-base in |
116 | //another buffer and checks it against the original data-base |
117 | bool test_serialize_db() |
118 | { |
119 | //Typedef data to create a Interprocess map |
120 | typedef std::pair<const std::size_t, std::size_t> MyPair; |
121 | typedef std::less<std::size_t> MyLess; |
122 | typedef node_allocator<MyPair, managed_external_buffer::segment_manager> |
123 | node_allocator_t; |
124 | typedef map<std::size_t, |
125 | std::size_t, |
126 | std::less<std::size_t>, |
127 | node_allocator_t> |
128 | MyMap; |
129 | |
130 | //Some constants |
131 | const std::size_t BufferSize = 65536; |
132 | const std::size_t MaxMsgSize = 100; |
133 | |
134 | //Allocate a memory buffer to hold the destiny database using vector<char> |
135 | std::vector<char> buffer_destiny(BufferSize, 0); |
136 | |
137 | message_queue::remove(name: test::get_process_id_name()); |
138 | { |
139 | //Create the message-queues |
140 | message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize); |
141 | |
142 | //Open previously created message-queue simulating other process |
143 | message_queue mq2(open_only, test::get_process_id_name()); |
144 | |
145 | //A managed heap memory to create the origin database |
146 | managed_heap_memory db_origin(buffer_destiny.size()); |
147 | |
148 | //Construct the map in the first buffer |
149 | MyMap *map1 = db_origin.construct<MyMap>(name: "MyMap" ) |
150 | (MyLess(), |
151 | db_origin.get_segment_manager()); |
152 | if(!map1) |
153 | return false; |
154 | |
155 | //Fill map1 until is full |
156 | BOOST_TRY{ |
157 | std::size_t i = 0; |
158 | while(1){ |
159 | (*map1)[i] = i; |
160 | ++i; |
161 | } |
162 | } |
163 | BOOST_CATCH(boost::interprocess::bad_alloc &){} BOOST_CATCH_END |
164 | |
165 | //Data control data sending through the message queue |
166 | std::size_t sent = 0; |
167 | message_queue::size_type recvd = 0; |
168 | message_queue::size_type total_recvd = 0; |
169 | unsigned int priority; |
170 | |
171 | //Send whole first buffer through the mq1, read it |
172 | //through mq2 to the second buffer |
173 | while(1){ |
174 | //Send a fragment of buffer1 through mq1 |
175 | std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ? |
176 | MaxMsgSize : (db_origin.get_size() - sent); |
177 | mq1.send( buffer: &static_cast<char*>(db_origin.get_address())[sent] |
178 | , buffer_size: bytes_to_send |
179 | , priority: 0); |
180 | sent += bytes_to_send; |
181 | //Receive the fragment through mq2 to buffer_destiny |
182 | mq2.receive( buffer: &buffer_destiny[total_recvd] |
183 | , buffer_size: BufferSize - recvd |
184 | , recvd_size&: recvd |
185 | , priority); |
186 | total_recvd += recvd; |
187 | |
188 | //Check if we have received all the buffer |
189 | if(total_recvd == BufferSize){ |
190 | break; |
191 | } |
192 | } |
193 | |
194 | //The buffer will contain a copy of the original database |
195 | //so let's interpret the buffer with managed_external_buffer |
196 | managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize); |
197 | |
198 | //Let's find the map |
199 | std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>(name: "MyMap" ); |
200 | MyMap *map2 = ret.first; |
201 | |
202 | //Check if we have found it |
203 | if(!map2){ |
204 | return false; |
205 | } |
206 | |
207 | //Check if it is a single variable (not an array) |
208 | if(ret.second != 1){ |
209 | return false; |
210 | } |
211 | |
212 | //Now let's compare size |
213 | if(map1->size() != map2->size()){ |
214 | return false; |
215 | } |
216 | |
217 | //Now let's compare all db values |
218 | MyMap::size_type num_elements = map1->size(); |
219 | for(std::size_t i = 0; i < num_elements; ++i){ |
220 | if((*map1)[i] != (*map2)[i]){ |
221 | return false; |
222 | } |
223 | } |
224 | |
225 | //Destroy maps from db-s |
226 | db_origin.destroy_ptr(ptr: map1); |
227 | db_destiny.destroy_ptr(ptr: map2); |
228 | } |
229 | message_queue::remove(name: test::get_process_id_name()); |
230 | return true; |
231 | } |
232 | //] |
233 | |
234 | static const int MsgSize = 10; |
235 | static const int NumMsg = 1000; |
236 | static char msgsend [10]; |
237 | static char msgrecv [10]; |
238 | |
239 | static boost::interprocess::message_queue *pmessage_queue; |
240 | |
241 | void receiver() |
242 | { |
243 | boost::interprocess::message_queue::size_type recvd_size; |
244 | unsigned int priority; |
245 | int nummsg = NumMsg; |
246 | |
247 | while(nummsg--){ |
248 | pmessage_queue->receive(buffer: msgrecv, buffer_size: MsgSize, recvd_size, priority); |
249 | } |
250 | } |
251 | |
252 | bool test_buffer_overflow() |
253 | { |
254 | boost::interprocess::message_queue::remove(name: test::get_process_id_name()); |
255 | { |
256 | boost::movelib::unique_ptr<boost::interprocess::message_queue> |
257 | ptr(new boost::interprocess::message_queue |
258 | (create_only, test::get_process_id_name(), 10, 10)); |
259 | pmessage_queue = ptr.get(); |
260 | |
261 | //Launch the receiver thread |
262 | boost::interprocess::ipcdetail::OS_thread_t thread; |
263 | boost::interprocess::ipcdetail::thread_launch(pt&: thread, f: &receiver); |
264 | boost::interprocess::ipcdetail::thread_yield(); |
265 | |
266 | int nummsg = NumMsg; |
267 | |
268 | while(nummsg--){ |
269 | pmessage_queue->send(buffer: msgsend, buffer_size: MsgSize, priority: 0); |
270 | } |
271 | |
272 | boost::interprocess::ipcdetail::thread_join(thread); |
273 | } |
274 | boost::interprocess::message_queue::remove(name: test::get_process_id_name()); |
275 | return true; |
276 | } |
277 | |
278 | |
279 | ////////////////////////////////////////////////////////////////////////////// |
280 | // |
281 | // test_multi_sender_receiver is based on Alexander (aalutov's) |
282 | // testcase for ticket #9221. Many thanks. |
283 | // |
284 | ////////////////////////////////////////////////////////////////////////////// |
285 | |
286 | static boost::interprocess::message_queue *global_queue = 0; |
287 | //We'll send MULTI_NUM_MSG_PER_SENDER messages per sender |
288 | static const int MULTI_NUM_MSG_PER_SENDER = 10000; |
289 | //Message queue message capacity |
290 | static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1; |
291 | //We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers |
292 | static const std::size_t MULTI_THREAD_COUNT = 10; |
293 | |
294 | static void multisend() |
295 | { |
296 | char buff; |
297 | for (std::size_t i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) { |
298 | global_queue->send(buffer: &buff, buffer_size: 1, priority: 0); |
299 | } |
300 | global_queue->send(buffer: &buff, buffer_size: 0, priority: 0); |
301 | //std::cout<<"writer thread complete"<<std::endl; |
302 | } |
303 | |
304 | static void multireceive() |
305 | { |
306 | char buff; |
307 | size_t size; |
308 | int received_msgs = 0; |
309 | unsigned int priority; |
310 | do { |
311 | global_queue->receive(buffer: &buff, buffer_size: 1, recvd_size&: size, priority); |
312 | ++received_msgs; |
313 | } while (size > 0); |
314 | --received_msgs; |
315 | //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl; |
316 | } |
317 | |
318 | |
319 | bool test_multi_sender_receiver() |
320 | { |
321 | bool ret = true; |
322 | //std::cout << "Testing multi-sender / multi-receiver " << std::endl; |
323 | BOOST_TRY { |
324 | boost::interprocess::message_queue::remove(name: test::get_process_id_name()); |
325 | boost::interprocess::message_queue mq |
326 | (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1); |
327 | global_queue = &mq; |
328 | std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2); |
329 | |
330 | //Launch senders receiver thread |
331 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) { |
332 | boost::interprocess::ipcdetail::thread_launch |
333 | (pt&: threads[i], f: &multisend); |
334 | } |
335 | |
336 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT; i++) { |
337 | boost::interprocess::ipcdetail::thread_launch |
338 | (pt&: threads[MULTI_THREAD_COUNT+i], f: &multireceive); |
339 | } |
340 | |
341 | for (std::size_t i = 0; i < MULTI_THREAD_COUNT*2; i++) { |
342 | boost::interprocess::ipcdetail::thread_join(thread: threads[i]); |
343 | //std::cout << "Joined thread " << i << std::endl; |
344 | } |
345 | } |
346 | BOOST_CATCH(std::exception &e) { |
347 | std::cout << "error " << e.what() << std::endl; |
348 | ret = false; |
349 | } BOOST_CATCH_END |
350 | boost::interprocess::message_queue::remove(name: test::get_process_id_name()); |
351 | return ret; |
352 | } |
353 | |
354 | class msg_queue_named_test_wrapper |
355 | : public test::named_sync_deleter<message_queue>, public message_queue |
356 | { |
357 | public: |
358 | |
359 | msg_queue_named_test_wrapper(create_only_t) |
360 | : message_queue(create_only, test::get_process_id_name(), 10, 10) |
361 | {} |
362 | |
363 | msg_queue_named_test_wrapper(open_only_t) |
364 | : message_queue(open_only, test::get_process_id_name()) |
365 | {} |
366 | |
367 | msg_queue_named_test_wrapper(open_or_create_t) |
368 | : message_queue(open_or_create, test::get_process_id_name(), 10, 10) |
369 | {} |
370 | |
371 | ~msg_queue_named_test_wrapper() |
372 | {} |
373 | }; |
374 | |
375 | #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) |
376 | |
377 | class msg_queue_named_test_wrapper_w |
378 | : public test::named_sync_deleter_w<message_queue>, public message_queue |
379 | { |
380 | public: |
381 | |
382 | template <class CharT> |
383 | msg_queue_named_test_wrapper_w(create_only_t) |
384 | : message_queue(create_only, test::get_process_id_wname(), 10, 10) |
385 | {} |
386 | |
387 | msg_queue_named_test_wrapper_w(open_only_t) |
388 | : message_queue(open_only, test::get_process_id_wname()) |
389 | {} |
390 | |
391 | msg_queue_named_test_wrapper_w(open_or_create_t) |
392 | : message_queue(open_or_create, test::get_process_id_wname(), 10, 10) |
393 | {} |
394 | |
395 | ~msg_queue_named_test_wrapper_w() |
396 | {} |
397 | }; |
398 | |
399 | #endif //defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) |
400 | |
401 | |
402 | int main () |
403 | { |
404 | int ret = 0; |
405 | BOOST_TRY{ |
406 | message_queue::remove(name: test::get_process_id_name()); |
407 | test::test_named_creation<msg_queue_named_test_wrapper>(); |
408 | #if defined(BOOST_INTERPROCESS_WCHAR_NAMED_RESOURCES) |
409 | test::test_named_creation<msg_queue_named_test_wrapper>(); |
410 | #endif |
411 | |
412 | if(!test_priority_order()){ |
413 | return 1; |
414 | } |
415 | |
416 | if(!test_serialize_db()){ |
417 | return 1; |
418 | } |
419 | |
420 | if(!test_buffer_overflow()){ |
421 | return 1; |
422 | } |
423 | |
424 | if(!test_multi_sender_receiver()){ |
425 | return 1; |
426 | } |
427 | } |
428 | BOOST_CATCH(std::exception &ex) { |
429 | std::cout << ex.what() << std::endl; |
430 | ret = 1; |
431 | } BOOST_CATCH_END |
432 | |
433 | message_queue::remove(name: test::get_process_id_name()); |
434 | return ret; |
435 | } |
436 | |
437 | |