1 | // Copyright (C) 2011-2013 Tim Blechmann |
2 | // |
3 | // Distributed under the Boost Software License, Version 1.0. (See |
4 | // accompanying file LICENSE_1_0.txt or copy at |
5 | // http://www.boost.org/LICENSE_1_0.txt) |
6 | |
7 | #include <boost/lockfree/spsc_queue.hpp> |
8 | #include <boost/thread.hpp> |
9 | |
10 | #define BOOST_TEST_MAIN |
11 | #ifdef BOOST_LOCKFREE_INCLUDE_TESTS |
12 | #include <boost/test/included/unit_test.hpp> |
13 | #else |
14 | #include <boost/test/unit_test.hpp> |
15 | #endif |
16 | |
17 | #include <iostream> |
18 | #include <memory> |
19 | |
20 | #include "test_helpers.hpp" |
21 | #include "test_common.hpp" |
22 | |
23 | using namespace boost; |
24 | using namespace boost::lockfree; |
25 | using namespace std; |
26 | |
27 | #ifndef BOOST_LOCKFREE_STRESS_TEST |
28 | static const boost::uint32_t nodes_per_thread = 100000; |
29 | #else |
30 | static const boost::uint32_t nodes_per_thread = 100000000; |
31 | #endif |
32 | |
33 | struct spsc_queue_tester |
34 | { |
35 | spsc_queue<int, capacity<128> > sf; |
36 | |
37 | boost::lockfree::detail::atomic<long> spsc_queue_cnt, received_nodes; |
38 | |
39 | // In VxWorks one RTP just supports 65535 objects |
40 | #ifndef __VXWORKS__ |
41 | static_hashed_set<int, 1<<16 > working_set; |
42 | #else |
43 | static_hashed_set<int, 1<<15 > working_set; |
44 | #endif |
45 | |
46 | spsc_queue_tester(void): |
47 | spsc_queue_cnt(0), received_nodes(0) |
48 | {} |
49 | |
50 | void add(void) |
51 | { |
52 | for (boost::uint32_t i = 0; i != nodes_per_thread; ++i) { |
53 | int id = generate_id<int>(); |
54 | working_set.insert(id); |
55 | |
56 | while (sf.push(t: id) == false) |
57 | {} |
58 | |
59 | ++spsc_queue_cnt; |
60 | } |
61 | running = false; |
62 | } |
63 | |
64 | bool get_element(void) |
65 | { |
66 | int data; |
67 | bool success = sf.pop(ret&: data); |
68 | |
69 | if (success) { |
70 | ++received_nodes; |
71 | --spsc_queue_cnt; |
72 | bool erased = working_set.erase(id: data); |
73 | assert(erased); |
74 | return true; |
75 | } else |
76 | return false; |
77 | } |
78 | |
79 | boost::lockfree::detail::atomic<bool> running; |
80 | |
81 | void get(void) |
82 | { |
83 | for(;;) { |
84 | bool success = get_element(); |
85 | if (!running && !success) |
86 | break; |
87 | } |
88 | |
89 | while ( get_element() ); |
90 | } |
91 | |
92 | void run(void) |
93 | { |
94 | running = true; |
95 | |
96 | BOOST_REQUIRE(sf.empty()); |
97 | |
98 | boost::thread reader(boost::bind(f: &spsc_queue_tester::get, a1: this)); |
99 | boost::thread writer(boost::bind(f: &spsc_queue_tester::add, a1: this)); |
100 | cout << "reader and writer threads created" << endl; |
101 | |
102 | writer.join(); |
103 | cout << "writer threads joined. waiting for readers to finish" << endl; |
104 | |
105 | reader.join(); |
106 | |
107 | BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread); |
108 | BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0); |
109 | BOOST_REQUIRE(sf.empty()); |
110 | BOOST_REQUIRE(working_set.count_nodes() == 0); |
111 | } |
112 | }; |
113 | |
114 | BOOST_AUTO_TEST_CASE( spsc_queue_test_caching ) |
115 | { |
116 | boost::shared_ptr<spsc_queue_tester> test1(new spsc_queue_tester); |
117 | test1->run(); |
118 | } |
119 | |
120 | struct spsc_queue_tester_buffering |
121 | { |
122 | spsc_queue<int, capacity<128> > sf; |
123 | |
124 | boost::lockfree::detail::atomic<long> spsc_queue_cnt; |
125 | |
126 | // In VxWorks one RTP just supports 65535 objects |
127 | #ifndef __VXWORKS__ |
128 | static_hashed_set<int, 1<<16 > working_set; |
129 | #else |
130 | static_hashed_set<int, 1<<15 > working_set; |
131 | #endif |
132 | |
133 | boost::lockfree::detail::atomic<size_t> received_nodes; |
134 | |
135 | spsc_queue_tester_buffering(void): |
136 | spsc_queue_cnt(0), received_nodes(0) |
137 | {} |
138 | |
139 | static const size_t buf_size = 5; |
140 | |
141 | void add(void) |
142 | { |
143 | boost::array<int, buf_size> input_buffer; |
144 | for (boost::uint32_t i = 0; i != nodes_per_thread; i+=buf_size) { |
145 | for (size_t i = 0; i != buf_size; ++i) { |
146 | int id = generate_id<int>(); |
147 | working_set.insert(id); |
148 | input_buffer[i] = id; |
149 | } |
150 | |
151 | size_t pushed = 0; |
152 | |
153 | do { |
154 | pushed += sf.push(t: input_buffer.c_array() + pushed, |
155 | size: input_buffer.size() - pushed); |
156 | } while (pushed != buf_size); |
157 | |
158 | spsc_queue_cnt+=buf_size; |
159 | } |
160 | running = false; |
161 | } |
162 | |
163 | bool get_elements(void) |
164 | { |
165 | boost::array<int, buf_size> output_buffer; |
166 | |
167 | size_t popd = sf.pop(ret: output_buffer.c_array(), size: output_buffer.size()); |
168 | |
169 | if (popd) { |
170 | received_nodes += popd; |
171 | spsc_queue_cnt -= popd; |
172 | |
173 | for (size_t i = 0; i != popd; ++i) { |
174 | bool erased = working_set.erase(id: output_buffer[i]); |
175 | assert(erased); |
176 | } |
177 | |
178 | return true; |
179 | } else |
180 | return false; |
181 | } |
182 | |
183 | boost::lockfree::detail::atomic<bool> running; |
184 | |
185 | void get(void) |
186 | { |
187 | for(;;) { |
188 | bool success = get_elements(); |
189 | if (!running && !success) |
190 | break; |
191 | } |
192 | |
193 | while ( get_elements() ); |
194 | } |
195 | |
196 | void run(void) |
197 | { |
198 | running = true; |
199 | |
200 | boost::thread reader(boost::bind(f: &spsc_queue_tester_buffering::get, a1: this)); |
201 | boost::thread writer(boost::bind(f: &spsc_queue_tester_buffering::add, a1: this)); |
202 | cout << "reader and writer threads created" << endl; |
203 | |
204 | writer.join(); |
205 | cout << "writer threads joined. waiting for readers to finish" << endl; |
206 | |
207 | reader.join(); |
208 | |
209 | BOOST_REQUIRE_EQUAL(received_nodes, nodes_per_thread); |
210 | BOOST_REQUIRE_EQUAL(spsc_queue_cnt, 0); |
211 | BOOST_REQUIRE(sf.empty()); |
212 | BOOST_REQUIRE(working_set.count_nodes() == 0); |
213 | } |
214 | }; |
215 | |
216 | |
217 | BOOST_AUTO_TEST_CASE( spsc_queue_test_buffering ) |
218 | { |
219 | boost::shared_ptr<spsc_queue_tester_buffering> test1(new spsc_queue_tester_buffering); |
220 | test1->run(); |
221 | } |
222 | |
223 | |