1 | #undef G_DISABLE_ASSERT |
2 | #undef G_LOG_DOMAIN |
3 | |
4 | #include <errno.h> |
5 | #include <glib.h> |
6 | #ifdef G_OS_UNIX |
7 | #include <unistd.h> |
8 | #endif |
9 | #include <stdio.h> |
10 | #include <stdlib.h> |
11 | |
12 | #ifdef G_OS_WIN32 |
13 | #include <fcntl.h> /* For _O_BINARY used by pipe() macro */ |
14 | #include <io.h> /* for _pipe() */ |
15 | #define pipe(fds) _pipe(fds, 4096, _O_BINARY) |
16 | #endif |
17 | |
18 | #define ITERS 10000 |
19 | #define INCREMENT 10 |
20 | #define NTHREADS 4 |
21 | #define NCRAWLERS 4 |
22 | #define CRAWLER_TIMEOUT_RANGE 40 |
23 | #define RECURSER_TIMEOUT 50 |
24 | |
25 | /* The partial ordering between the context array mutex and |
26 | * crawler array mutex is that the crawler array mutex cannot |
27 | * be locked while the context array mutex is locked |
28 | */ |
29 | GPtrArray *context_array; |
30 | GMutex context_array_mutex; |
31 | GCond context_array_cond; |
32 | |
33 | GMainLoop *main_loop; |
34 | |
35 | G_LOCK_DEFINE_STATIC (crawler_array_lock); |
36 | GPtrArray *crawler_array; |
37 | |
38 | typedef struct _AddrData AddrData; |
39 | typedef struct _TestData TestData; |
40 | |
41 | struct _AddrData |
42 | { |
43 | GMainLoop *loop; |
44 | GIOChannel *dest; |
45 | gint count; |
46 | }; |
47 | |
48 | struct _TestData |
49 | { |
50 | gint current_val; |
51 | gint iters; |
52 | GIOChannel *in; |
53 | }; |
54 | |
55 | static void cleanup_crawlers (GMainContext *context); |
56 | |
57 | static gboolean |
58 | read_all (GIOChannel *channel, char *buf, gsize len) |
59 | { |
60 | gsize bytes_read = 0; |
61 | gsize count; |
62 | GIOError err; |
63 | |
64 | while (bytes_read < len) |
65 | { |
66 | err = g_io_channel_read (channel, buf: buf + bytes_read, count: len - bytes_read, bytes_read: &count); |
67 | if (err) |
68 | { |
69 | if (err != G_IO_ERROR_AGAIN) |
70 | return FALSE; |
71 | } |
72 | else if (count == 0) |
73 | return FALSE; |
74 | |
75 | bytes_read += count; |
76 | } |
77 | |
78 | return TRUE; |
79 | } |
80 | |
81 | static gboolean |
82 | write_all (GIOChannel *channel, char *buf, gsize len) |
83 | { |
84 | gsize bytes_written = 0; |
85 | gsize count; |
86 | GIOError err; |
87 | |
88 | while (bytes_written < len) |
89 | { |
90 | err = g_io_channel_write (channel, buf: buf + bytes_written, count: len - bytes_written, bytes_written: &count); |
91 | if (err && err != G_IO_ERROR_AGAIN) |
92 | return FALSE; |
93 | |
94 | bytes_written += count; |
95 | } |
96 | |
97 | return TRUE; |
98 | } |
99 | |
100 | static gboolean |
101 | adder_callback (GIOChannel *source, |
102 | GIOCondition condition, |
103 | gpointer data) |
104 | { |
105 | char buf1[32]; |
106 | char buf2[32]; |
107 | |
108 | char result[32] = { 0, }; |
109 | |
110 | AddrData *addr_data = data; |
111 | |
112 | if (!read_all (channel: source, buf: buf1, len: 32) || |
113 | !read_all (channel: source, buf: buf2, len: 32)) |
114 | { |
115 | g_main_loop_quit (loop: addr_data->loop); |
116 | return FALSE; |
117 | } |
118 | |
119 | sprintf (s: result, format: "%d" , atoi(nptr: buf1) + atoi(nptr: buf2)); |
120 | write_all (channel: addr_data->dest, buf: result, len: 32); |
121 | |
122 | return TRUE; |
123 | } |
124 | |
125 | static gboolean |
126 | timeout_callback (gpointer data) |
127 | { |
128 | AddrData *addr_data = data; |
129 | |
130 | addr_data->count++; |
131 | |
132 | return TRUE; |
133 | } |
134 | |
135 | static gpointer |
136 | adder_thread (gpointer data) |
137 | { |
138 | GMainContext *context; |
139 | GSource *adder_source; |
140 | GSource *timeout_source; |
141 | |
142 | GIOChannel **channels = data; |
143 | AddrData addr_data; |
144 | |
145 | context = g_main_context_new (); |
146 | |
147 | g_mutex_lock (mutex: &context_array_mutex); |
148 | |
149 | g_ptr_array_add (array: context_array, data: context); |
150 | |
151 | if (context_array->len == NTHREADS) |
152 | g_cond_broadcast (cond: &context_array_cond); |
153 | |
154 | g_mutex_unlock (mutex: &context_array_mutex); |
155 | |
156 | addr_data.dest = channels[1]; |
157 | addr_data.loop = g_main_loop_new (context, FALSE); |
158 | addr_data.count = 0; |
159 | |
160 | adder_source = g_io_create_watch (channel: channels[0], condition: G_IO_IN | G_IO_HUP); |
161 | g_source_set_name (source: adder_source, name: "Adder I/O" ); |
162 | g_source_set_callback (source: adder_source, func: (GSourceFunc)adder_callback, data: &addr_data, NULL); |
163 | g_source_attach (source: adder_source, context); |
164 | g_source_unref (source: adder_source); |
165 | |
166 | timeout_source = g_timeout_source_new (interval: 10); |
167 | g_source_set_name (source: timeout_source, name: "Adder timeout" ); |
168 | g_source_set_callback (source: timeout_source, func: (GSourceFunc)timeout_callback, data: &addr_data, NULL); |
169 | g_source_set_priority (source: timeout_source, G_PRIORITY_HIGH); |
170 | g_source_attach (source: timeout_source, context); |
171 | g_source_unref (source: timeout_source); |
172 | |
173 | g_main_loop_run (loop: addr_data.loop); |
174 | |
175 | g_io_channel_unref (channel: channels[0]); |
176 | g_io_channel_unref (channel: channels[1]); |
177 | |
178 | g_free (mem: channels); |
179 | |
180 | g_main_loop_unref (loop: addr_data.loop); |
181 | |
182 | #ifdef VERBOSE |
183 | g_print ("Timeout run %d times\n" , addr_data.count); |
184 | #endif |
185 | |
186 | g_mutex_lock (mutex: &context_array_mutex); |
187 | g_ptr_array_remove (array: context_array, data: context); |
188 | if (context_array->len == 0) |
189 | g_main_loop_quit (loop: main_loop); |
190 | g_mutex_unlock (mutex: &context_array_mutex); |
191 | |
192 | cleanup_crawlers (context); |
193 | g_main_context_unref (context); |
194 | |
195 | return NULL; |
196 | } |
197 | |
198 | static void |
199 | io_pipe (GIOChannel **channels) |
200 | { |
201 | gint fds[2]; |
202 | |
203 | if (pipe(pipedes: fds) < 0) |
204 | { |
205 | int errsv = errno; |
206 | g_warning ("Cannot create pipe %s" , g_strerror (errsv)); |
207 | exit (status: 1); |
208 | } |
209 | |
210 | channels[0] = g_io_channel_unix_new (fd: fds[0]); |
211 | channels[1] = g_io_channel_unix_new (fd: fds[1]); |
212 | |
213 | g_io_channel_set_close_on_unref (channel: channels[0], TRUE); |
214 | g_io_channel_set_close_on_unref (channel: channels[1], TRUE); |
215 | } |
216 | |
217 | static void |
218 | do_add (GIOChannel *in, gint a, gint b) |
219 | { |
220 | char buf1[32] = { 0, }; |
221 | char buf2[32] = { 0, }; |
222 | |
223 | sprintf (s: buf1, format: "%d" , a); |
224 | sprintf (s: buf2, format: "%d" , b); |
225 | |
226 | write_all (channel: in, buf: buf1, len: 32); |
227 | write_all (channel: in, buf: buf2, len: 32); |
228 | } |
229 | |
230 | static gboolean |
231 | adder_response (GIOChannel *source, |
232 | GIOCondition condition, |
233 | gpointer data) |
234 | { |
235 | char result[32]; |
236 | TestData *test_data = data; |
237 | |
238 | if (!read_all (channel: source, buf: result, len: 32)) |
239 | return FALSE; |
240 | |
241 | test_data->current_val = atoi (nptr: result); |
242 | test_data->iters--; |
243 | |
244 | if (test_data->iters == 0) |
245 | { |
246 | if (test_data->current_val != ITERS * INCREMENT) |
247 | { |
248 | g_print (format: "Addition failed: %d != %d\n" , |
249 | test_data->current_val, ITERS * INCREMENT); |
250 | exit (status: 1); |
251 | } |
252 | |
253 | g_io_channel_unref (channel: source); |
254 | g_io_channel_unref (channel: test_data->in); |
255 | |
256 | g_free (mem: test_data); |
257 | |
258 | return FALSE; |
259 | } |
260 | |
261 | do_add (in: test_data->in, a: test_data->current_val, INCREMENT); |
262 | |
263 | return TRUE; |
264 | } |
265 | |
266 | static GThread * |
267 | create_adder_thread (void) |
268 | { |
269 | GThread *thread; |
270 | TestData *test_data; |
271 | |
272 | GIOChannel *in_channels[2]; |
273 | GIOChannel *out_channels[2]; |
274 | |
275 | GIOChannel **sub_channels; |
276 | |
277 | sub_channels = g_new (GIOChannel *, 2); |
278 | |
279 | io_pipe (channels: in_channels); |
280 | io_pipe (channels: out_channels); |
281 | |
282 | sub_channels[0] = in_channels[0]; |
283 | sub_channels[1] = out_channels[1]; |
284 | |
285 | thread = g_thread_new (name: "adder" , func: adder_thread, data: sub_channels); |
286 | |
287 | test_data = g_new (TestData, 1); |
288 | test_data->in = in_channels[1]; |
289 | test_data->current_val = 0; |
290 | test_data->iters = ITERS; |
291 | |
292 | g_io_add_watch (channel: out_channels[0], condition: G_IO_IN | G_IO_HUP, |
293 | func: adder_response, user_data: test_data); |
294 | |
295 | do_add (in: test_data->in, a: test_data->current_val, INCREMENT); |
296 | |
297 | return thread; |
298 | } |
299 | |
300 | static void create_crawler (void); |
301 | |
302 | static void |
303 | remove_crawler (void) |
304 | { |
305 | GSource *other_source; |
306 | |
307 | if (crawler_array->len > 0) |
308 | { |
309 | other_source = crawler_array->pdata[g_random_int_range (begin: 0, end: crawler_array->len)]; |
310 | g_source_destroy (source: other_source); |
311 | g_assert (g_ptr_array_remove_fast (crawler_array, other_source)); |
312 | } |
313 | } |
314 | |
315 | static gint |
316 | crawler_callback (gpointer data) |
317 | { |
318 | GSource *source = data; |
319 | |
320 | G_LOCK (crawler_array_lock); |
321 | |
322 | if (!g_ptr_array_remove_fast (array: crawler_array, data: source)) |
323 | remove_crawler(); |
324 | |
325 | remove_crawler(); |
326 | G_UNLOCK (crawler_array_lock); |
327 | |
328 | create_crawler(); |
329 | create_crawler(); |
330 | |
331 | return FALSE; |
332 | } |
333 | |
334 | static void |
335 | create_crawler (void) |
336 | { |
337 | GSource *source = g_timeout_source_new (interval: g_random_int_range (begin: 0, CRAWLER_TIMEOUT_RANGE)); |
338 | g_source_set_name (source, name: "Crawler timeout" ); |
339 | g_source_set_callback (source, func: (GSourceFunc)crawler_callback, data: source, NULL); |
340 | |
341 | G_LOCK (crawler_array_lock); |
342 | g_ptr_array_add (array: crawler_array, data: source); |
343 | |
344 | g_mutex_lock (mutex: &context_array_mutex); |
345 | g_source_attach (source, context: context_array->pdata[g_random_int_range (begin: 0, end: context_array->len)]); |
346 | g_source_unref (source); |
347 | g_mutex_unlock (mutex: &context_array_mutex); |
348 | |
349 | G_UNLOCK (crawler_array_lock); |
350 | } |
351 | |
352 | static void |
353 | cleanup_crawlers (GMainContext *context) |
354 | { |
355 | gint i; |
356 | |
357 | G_LOCK (crawler_array_lock); |
358 | for (i=0; i < crawler_array->len; i++) |
359 | { |
360 | if (g_source_get_context (source: crawler_array->pdata[i]) == context) |
361 | { |
362 | g_source_destroy (source: g_ptr_array_remove_index (array: crawler_array, index_: i)); |
363 | i--; |
364 | } |
365 | } |
366 | G_UNLOCK (crawler_array_lock); |
367 | } |
368 | |
369 | static gboolean |
370 | recurser_idle (gpointer data) |
371 | { |
372 | GMainContext *context = data; |
373 | gint i; |
374 | |
375 | for (i = 0; i < 10; i++) |
376 | g_main_context_iteration (context, FALSE); |
377 | |
378 | return FALSE; |
379 | } |
380 | |
381 | static gboolean |
382 | recurser_start (gpointer data) |
383 | { |
384 | GMainContext *context; |
385 | GSource *source; |
386 | |
387 | g_mutex_lock (mutex: &context_array_mutex); |
388 | if (context_array->len > 0) |
389 | { |
390 | context = context_array->pdata[g_random_int_range (begin: 0, end: context_array->len)]; |
391 | source = g_idle_source_new (); |
392 | g_source_set_name (source, name: "Recursing idle source" ); |
393 | g_source_set_callback (source, func: recurser_idle, data: context, NULL); |
394 | g_source_attach (source, context); |
395 | g_source_unref (source); |
396 | } |
397 | g_mutex_unlock (mutex: &context_array_mutex); |
398 | |
399 | return TRUE; |
400 | } |
401 | |
402 | int |
403 | main (int argc, |
404 | char *argv[]) |
405 | { |
406 | gint i; |
407 | GThread *threads[NTHREADS]; |
408 | |
409 | context_array = g_ptr_array_new (); |
410 | |
411 | crawler_array = g_ptr_array_new (); |
412 | |
413 | main_loop = g_main_loop_new (NULL, FALSE); |
414 | |
415 | for (i = 0; i < NTHREADS; i++) |
416 | threads[i] = create_adder_thread (); |
417 | |
418 | /* Wait for all threads to start |
419 | */ |
420 | g_mutex_lock (mutex: &context_array_mutex); |
421 | |
422 | while (context_array->len < NTHREADS) |
423 | g_cond_wait (cond: &context_array_cond, mutex: &context_array_mutex); |
424 | |
425 | g_mutex_unlock (mutex: &context_array_mutex); |
426 | |
427 | for (i = 0; i < NCRAWLERS; i++) |
428 | create_crawler (); |
429 | |
430 | g_timeout_add (RECURSER_TIMEOUT, function: recurser_start, NULL); |
431 | |
432 | g_main_loop_run (loop: main_loop); |
433 | g_main_loop_unref (loop: main_loop); |
434 | |
435 | for (i = 0; i < NTHREADS; i++) |
436 | g_thread_join (thread: threads[i]); |
437 | |
438 | g_ptr_array_unref (array: crawler_array); |
439 | g_ptr_array_unref (array: context_array); |
440 | |
441 | return 0; |
442 | } |
443 | |