1 | /* GLib testing framework examples and tests |
2 | * Copyright (C) 2008 Red Hat, Inc |
3 | * |
4 | * This work is provided "as is"; redistribution and modification |
5 | * in whole or in part, in any medium, physical or electronic is |
6 | * permitted without restriction. |
7 | * |
8 | * This work is distributed in the hope that it will be useful, |
9 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
10 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
11 | * |
12 | * In no event shall the authors or contributors be liable for any |
13 | * direct, indirect, incidental, special, exemplary, or consequential |
14 | * damages (including, but not limited to, procurement of substitute |
15 | * goods or services; loss of use, data, or profits; or business |
16 | * interruption) however caused and on any theory of liability, whether |
17 | * in contract, strict liability, or tort (including negligence or |
18 | * otherwise) arising in any way out of the use of this software, even |
19 | * if advised of the possibility of such damage. |
20 | */ |
21 | |
22 | #include <gio/gio.h> |
23 | #include <gio/gunixinputstream.h> |
24 | #include <gio/gunixoutputstream.h> |
25 | #include <glib.h> |
26 | #include <glib/glib-unix.h> |
27 | #include <signal.h> |
28 | #include <stdlib.h> |
29 | #include <string.h> |
30 | #include <unistd.h> |
31 | #include <fcntl.h> |
32 | |
33 | /* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */ |
34 | static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz" ; |
35 | |
36 | int writer_pipe[2], reader_pipe[2]; |
37 | GCancellable *writer_cancel, *reader_cancel, *main_cancel; |
38 | GMainLoop *loop; |
39 | |
40 | |
41 | static gpointer |
42 | writer_thread (gpointer user_data) |
43 | { |
44 | GOutputStream *out; |
45 | gssize nwrote, offset; |
46 | GError *err = NULL; |
47 | |
48 | out = g_unix_output_stream_new (fd: writer_pipe[1], TRUE); |
49 | |
50 | do |
51 | { |
52 | g_usleep (microseconds: 10); |
53 | |
54 | offset = 0; |
55 | while (offset < (gssize) sizeof (DATA)) |
56 | { |
57 | nwrote = g_output_stream_write (stream: out, buffer: DATA + offset, |
58 | count: sizeof (DATA) - offset, |
59 | cancellable: writer_cancel, error: &err); |
60 | if (nwrote <= 0 || err != NULL) |
61 | break; |
62 | offset += nwrote; |
63 | } |
64 | |
65 | g_assert (nwrote > 0 || err != NULL); |
66 | } |
67 | while (err == NULL); |
68 | |
69 | if (g_cancellable_is_cancelled (cancellable: writer_cancel)) |
70 | { |
71 | g_clear_error (err: &err); |
72 | g_cancellable_cancel (cancellable: main_cancel); |
73 | g_object_unref (object: out); |
74 | return NULL; |
75 | } |
76 | |
77 | g_warning ("writer: %s" , err->message); |
78 | g_assert_not_reached (); |
79 | } |
80 | |
81 | static gpointer |
82 | reader_thread (gpointer user_data) |
83 | { |
84 | GInputStream *in; |
85 | gssize nread = 0, total; |
86 | GError *err = NULL; |
87 | char buf[sizeof (DATA)]; |
88 | |
89 | in = g_unix_input_stream_new (fd: reader_pipe[0], TRUE); |
90 | |
91 | do |
92 | { |
93 | total = 0; |
94 | while (total < (gssize) sizeof (DATA)) |
95 | { |
96 | nread = g_input_stream_read (stream: in, buffer: buf + total, count: sizeof (buf) - total, |
97 | cancellable: reader_cancel, error: &err); |
98 | if (nread <= 0 || err != NULL) |
99 | break; |
100 | total += nread; |
101 | } |
102 | |
103 | if (err) |
104 | break; |
105 | |
106 | if (nread == 0) |
107 | { |
108 | g_assert (err == NULL); |
109 | /* pipe closed */ |
110 | g_object_unref (object: in); |
111 | return NULL; |
112 | } |
113 | |
114 | g_assert_cmpstr (buf, ==, DATA); |
115 | g_assert (!g_cancellable_is_cancelled (reader_cancel)); |
116 | } |
117 | while (err == NULL); |
118 | |
119 | g_warning ("reader: %s" , err->message); |
120 | g_assert_not_reached (); |
121 | } |
122 | |
123 | static char main_buf[sizeof (DATA)]; |
124 | static gssize main_len, main_offset; |
125 | |
126 | static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data); |
127 | static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data); |
128 | static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data); |
129 | |
130 | static void |
131 | do_main_cancel (GOutputStream *out) |
132 | { |
133 | g_output_stream_close (stream: out, NULL, NULL); |
134 | g_main_loop_quit (loop); |
135 | } |
136 | |
137 | static void |
138 | main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data) |
139 | { |
140 | GInputStream *in = G_INPUT_STREAM (source); |
141 | GOutputStream *out = user_data; |
142 | GError *err = NULL; |
143 | gssize nskipped; |
144 | |
145 | nskipped = g_input_stream_skip_finish (stream: in, result: res, error: &err); |
146 | |
147 | if (g_cancellable_is_cancelled (cancellable: main_cancel)) |
148 | { |
149 | do_main_cancel (out); |
150 | return; |
151 | } |
152 | |
153 | g_assert_no_error (err); |
154 | |
155 | main_offset += nskipped; |
156 | if (main_offset == main_len) |
157 | { |
158 | main_offset = 0; |
159 | g_output_stream_write_async (stream: out, buffer: main_buf, count: main_len, |
160 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
161 | callback: main_thread_wrote, user_data: in); |
162 | } |
163 | else |
164 | { |
165 | g_input_stream_skip_async (stream: in, count: main_len - main_offset, |
166 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
167 | callback: main_thread_skipped, user_data: out); |
168 | } |
169 | } |
170 | |
171 | static void |
172 | main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data) |
173 | { |
174 | GInputStream *in = G_INPUT_STREAM (source); |
175 | GOutputStream *out = user_data; |
176 | GError *err = NULL; |
177 | gssize nread; |
178 | |
179 | nread = g_input_stream_read_finish (stream: in, result: res, error: &err); |
180 | |
181 | if (g_cancellable_is_cancelled (cancellable: main_cancel)) |
182 | { |
183 | do_main_cancel (out); |
184 | g_clear_error (err: &err); |
185 | return; |
186 | } |
187 | |
188 | g_assert_no_error (err); |
189 | |
190 | main_offset += nread; |
191 | if (main_offset == sizeof (DATA)) |
192 | { |
193 | main_len = main_offset; |
194 | main_offset = 0; |
195 | /* Now skip the same amount */ |
196 | g_input_stream_skip_async (stream: in, count: main_len, |
197 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
198 | callback: main_thread_skipped, user_data: out); |
199 | } |
200 | else |
201 | { |
202 | g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf), |
203 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
204 | callback: main_thread_read, user_data: out); |
205 | } |
206 | } |
207 | |
208 | static void |
209 | main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data) |
210 | { |
211 | GOutputStream *out = G_OUTPUT_STREAM (source); |
212 | GInputStream *in = user_data; |
213 | GError *err = NULL; |
214 | gssize nwrote; |
215 | |
216 | nwrote = g_output_stream_write_finish (stream: out, result: res, error: &err); |
217 | |
218 | if (g_cancellable_is_cancelled (cancellable: main_cancel)) |
219 | { |
220 | do_main_cancel (out); |
221 | g_clear_error (err: &err); |
222 | return; |
223 | } |
224 | |
225 | g_assert_no_error (err); |
226 | g_assert_cmpint (nwrote, <=, main_len - main_offset); |
227 | |
228 | main_offset += nwrote; |
229 | if (main_offset == main_len) |
230 | { |
231 | main_offset = 0; |
232 | g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf), |
233 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
234 | callback: main_thread_read, user_data: out); |
235 | } |
236 | else |
237 | { |
238 | g_output_stream_write_async (stream: out, buffer: main_buf + main_offset, |
239 | count: main_len - main_offset, |
240 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
241 | callback: main_thread_wrote, user_data: in); |
242 | } |
243 | } |
244 | |
245 | static gboolean |
246 | timeout (gpointer cancellable) |
247 | { |
248 | g_cancellable_cancel (cancellable); |
249 | return FALSE; |
250 | } |
251 | |
252 | static void |
253 | test_pipe_io (gconstpointer nonblocking) |
254 | { |
255 | GThread *writer, *reader; |
256 | GInputStream *in; |
257 | GOutputStream *out; |
258 | |
259 | /* Split off two (additional) threads, a reader and a writer. From |
260 | * the writer thread, write data synchronously in small chunks, |
261 | * which gets alternately read and skipped asynchronously by the |
262 | * main thread and then (if not skipped) written asynchronously to |
263 | * the reader thread, which reads it synchronously. Eventually a |
264 | * timeout in the main thread will cause it to cancel the writer |
265 | * thread, which will in turn cancel the read op in the main thread, |
266 | * which will then close the pipe to the reader thread, causing the |
267 | * read op to fail. |
268 | */ |
269 | |
270 | g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0); |
271 | |
272 | if (nonblocking) |
273 | { |
274 | GError *error = NULL; |
275 | |
276 | g_unix_set_fd_nonblocking (fd: writer_pipe[0], TRUE, error: &error); |
277 | g_assert_no_error (error); |
278 | g_unix_set_fd_nonblocking (fd: writer_pipe[1], TRUE, error: &error); |
279 | g_assert_no_error (error); |
280 | g_unix_set_fd_nonblocking (fd: reader_pipe[0], TRUE, error: &error); |
281 | g_assert_no_error (error); |
282 | g_unix_set_fd_nonblocking (fd: reader_pipe[1], TRUE, error: &error); |
283 | g_assert_no_error (error); |
284 | } |
285 | |
286 | writer_cancel = g_cancellable_new (); |
287 | reader_cancel = g_cancellable_new (); |
288 | main_cancel = g_cancellable_new (); |
289 | |
290 | writer = g_thread_new (name: "writer" , func: writer_thread, NULL); |
291 | reader = g_thread_new (name: "reader" , func: reader_thread, NULL); |
292 | |
293 | in = g_unix_input_stream_new (fd: writer_pipe[0], TRUE); |
294 | out = g_unix_output_stream_new (fd: reader_pipe[1], TRUE); |
295 | |
296 | g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf), |
297 | G_PRIORITY_DEFAULT, cancellable: main_cancel, |
298 | callback: main_thread_read, user_data: out); |
299 | |
300 | g_timeout_add (interval: 500, function: timeout, data: writer_cancel); |
301 | |
302 | loop = g_main_loop_new (NULL, TRUE); |
303 | g_main_loop_run (loop); |
304 | g_main_loop_unref (loop); |
305 | |
306 | g_thread_join (thread: reader); |
307 | g_thread_join (thread: writer); |
308 | |
309 | g_object_unref (object: main_cancel); |
310 | g_object_unref (object: reader_cancel); |
311 | g_object_unref (object: writer_cancel); |
312 | g_object_unref (object: in); |
313 | g_object_unref (object: out); |
314 | } |
315 | |
316 | static void |
317 | test_basic (void) |
318 | { |
319 | GUnixInputStream *is; |
320 | GUnixOutputStream *os; |
321 | gint fd; |
322 | gboolean close_fd; |
323 | |
324 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE)); |
325 | g_object_get (object: is, |
326 | first_property_name: "fd" , &fd, |
327 | "close-fd" , &close_fd, |
328 | NULL); |
329 | g_assert_cmpint (fd, ==, 0); |
330 | g_assert (close_fd); |
331 | |
332 | g_unix_input_stream_set_close_fd (stream: is, FALSE); |
333 | g_assert (!g_unix_input_stream_get_close_fd (is)); |
334 | g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0); |
335 | |
336 | g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is))); |
337 | |
338 | g_object_unref (object: is); |
339 | |
340 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE)); |
341 | g_object_get (object: os, |
342 | first_property_name: "fd" , &fd, |
343 | "close-fd" , &close_fd, |
344 | NULL); |
345 | g_assert_cmpint (fd, ==, 1); |
346 | g_assert (close_fd); |
347 | |
348 | g_unix_output_stream_set_close_fd (stream: os, FALSE); |
349 | g_assert (!g_unix_output_stream_get_close_fd (os)); |
350 | g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1); |
351 | |
352 | g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os))); |
353 | |
354 | g_object_unref (object: os); |
355 | } |
356 | |
357 | typedef struct { |
358 | GInputStream *is; |
359 | GOutputStream *os; |
360 | const guint8 *write_data; |
361 | guint8 *read_data; |
362 | } TestReadWriteData; |
363 | |
364 | static gpointer |
365 | test_read_write_write_thread (gpointer user_data) |
366 | { |
367 | TestReadWriteData *data = user_data; |
368 | gsize bytes_written; |
369 | GError *error = NULL; |
370 | gboolean res; |
371 | |
372 | res = g_output_stream_write_all (stream: data->os, buffer: data->write_data, count: 1024, bytes_written: &bytes_written, NULL, error: &error); |
373 | g_assert_true (res); |
374 | g_assert_no_error (error); |
375 | g_assert_cmpuint (bytes_written, ==, 1024); |
376 | |
377 | return NULL; |
378 | } |
379 | |
380 | static gpointer |
381 | test_read_write_read_thread (gpointer user_data) |
382 | { |
383 | TestReadWriteData *data = user_data; |
384 | gsize bytes_read; |
385 | GError *error = NULL; |
386 | gboolean res; |
387 | |
388 | res = g_input_stream_read_all (stream: data->is, buffer: data->read_data, count: 1024, bytes_read: &bytes_read, NULL, error: &error); |
389 | g_assert_true (res); |
390 | g_assert_no_error (error); |
391 | g_assert_cmpuint (bytes_read, ==, 1024); |
392 | |
393 | return NULL; |
394 | } |
395 | |
396 | static gpointer |
397 | test_read_write_writev_thread (gpointer user_data) |
398 | { |
399 | TestReadWriteData *data = user_data; |
400 | gsize bytes_written; |
401 | GError *error = NULL; |
402 | gboolean res; |
403 | GOutputVector vectors[3]; |
404 | |
405 | vectors[0].buffer = data->write_data; |
406 | vectors[0].size = 256; |
407 | vectors[1].buffer = data->write_data + 256; |
408 | vectors[1].size = 256; |
409 | vectors[2].buffer = data->write_data + 512; |
410 | vectors[2].size = 512; |
411 | |
412 | res = g_output_stream_writev_all (stream: data->os, vectors, G_N_ELEMENTS (vectors), bytes_written: &bytes_written, NULL, error: &error); |
413 | g_assert_true (res); |
414 | g_assert_no_error (error); |
415 | g_assert_cmpuint (bytes_written, ==, 1024); |
416 | |
417 | return NULL; |
418 | } |
419 | |
420 | /* test if normal writing/reading from a pipe works */ |
421 | static void |
422 | test_read_write (gconstpointer user_data) |
423 | { |
424 | gboolean writev = GPOINTER_TO_INT (user_data); |
425 | GUnixInputStream *is; |
426 | GUnixOutputStream *os; |
427 | gint fd[2]; |
428 | guint8 data_write[1024], data_read[1024]; |
429 | guint i; |
430 | GThread *write_thread, *read_thread; |
431 | TestReadWriteData data; |
432 | |
433 | for (i = 0; i < sizeof (data_write); i++) |
434 | data_write[i] = i; |
435 | |
436 | g_assert_cmpint (pipe (fd), ==, 0); |
437 | |
438 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE)); |
439 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE)); |
440 | |
441 | data.is = G_INPUT_STREAM (is); |
442 | data.os = G_OUTPUT_STREAM (os); |
443 | data.read_data = data_read; |
444 | data.write_data = data_write; |
445 | |
446 | if (writev) |
447 | write_thread = g_thread_new (name: "writer" , func: test_read_write_writev_thread, data: &data); |
448 | else |
449 | write_thread = g_thread_new (name: "writer" , func: test_read_write_write_thread, data: &data); |
450 | read_thread = g_thread_new (name: "reader" , func: test_read_write_read_thread, data: &data); |
451 | |
452 | g_thread_join (thread: write_thread); |
453 | g_thread_join (thread: read_thread); |
454 | |
455 | g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read); |
456 | |
457 | g_object_unref (object: os); |
458 | g_object_unref (object: is); |
459 | } |
460 | |
461 | /* test if g_pollable_output_stream_write_nonblocking() and |
462 | * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK |
463 | * and correctly reset their status afterwards again, and all data that is |
464 | * written can also be read again. |
465 | */ |
466 | static void |
467 | test_write_wouldblock (void) |
468 | { |
469 | #ifndef F_GETPIPE_SZ |
470 | g_test_skip ("F_GETPIPE_SZ not defined" ); |
471 | #else /* if F_GETPIPE_SZ */ |
472 | GUnixInputStream *is; |
473 | GUnixOutputStream *os; |
474 | gint fd[2]; |
475 | GError *err = NULL; |
476 | guint8 data_write[1024], data_read[1024]; |
477 | guint i; |
478 | gint pipe_capacity; |
479 | |
480 | for (i = 0; i < sizeof (data_write); i++) |
481 | data_write[i] = i; |
482 | |
483 | g_assert_cmpint (pipe (fd), ==, 0); |
484 | |
485 | g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0); |
486 | pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL); |
487 | g_assert_cmpint (pipe_capacity, >=, 4096); |
488 | g_assert_cmpint (pipe_capacity % 1024, >=, 0); |
489 | |
490 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE)); |
491 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE)); |
492 | |
493 | /* Run the whole thing three times to make sure that the streams |
494 | * reset the writability/readability state again */ |
495 | for (i = 0; i < 3; i++) { |
496 | gssize written = 0, written_complete = 0; |
497 | gssize read = 0, read_complete = 0; |
498 | |
499 | do |
500 | { |
501 | written_complete += written; |
502 | written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os), |
503 | buffer: data_write, |
504 | count: sizeof (data_write), |
505 | NULL, |
506 | error: &err); |
507 | } |
508 | while (written > 0); |
509 | |
510 | g_assert_cmpuint (written_complete, >, 0); |
511 | g_assert_nonnull (err); |
512 | g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); |
513 | g_clear_error (err: &err); |
514 | |
515 | do |
516 | { |
517 | read_complete += read; |
518 | read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is), |
519 | buffer: data_read, |
520 | count: sizeof (data_read), |
521 | NULL, |
522 | error: &err); |
523 | if (read > 0) |
524 | g_assert_cmpmem (data_read, read, data_write, sizeof (data_write)); |
525 | } |
526 | while (read > 0); |
527 | |
528 | g_assert_cmpuint (read_complete, ==, written_complete); |
529 | g_assert_nonnull (err); |
530 | g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); |
531 | g_clear_error (err: &err); |
532 | } |
533 | |
534 | g_object_unref (object: os); |
535 | g_object_unref (object: is); |
536 | #endif /* if F_GETPIPE_SZ */ |
537 | } |
538 | |
539 | /* test if g_pollable_output_stream_writev_nonblocking() and |
540 | * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK |
541 | * and correctly reset their status afterwards again, and all data that is |
542 | * written can also be read again. |
543 | */ |
544 | static void |
545 | test_writev_wouldblock (void) |
546 | { |
547 | #ifndef F_GETPIPE_SZ |
548 | g_test_skip ("F_GETPIPE_SZ not defined" ); |
549 | #else /* if F_GETPIPE_SZ */ |
550 | GUnixInputStream *is; |
551 | GUnixOutputStream *os; |
552 | gint fd[2]; |
553 | GError *err = NULL; |
554 | guint8 data_write[1024], data_read[1024]; |
555 | guint i; |
556 | GOutputVector vectors[4]; |
557 | GPollableReturn res; |
558 | gint pipe_capacity; |
559 | |
560 | for (i = 0; i < sizeof (data_write); i++) |
561 | data_write[i] = i; |
562 | |
563 | g_assert_cmpint (pipe (fd), ==, 0); |
564 | |
565 | g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0); |
566 | pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL); |
567 | g_assert_cmpint (pipe_capacity, >=, 4096); |
568 | g_assert_cmpint (pipe_capacity % 1024, >=, 0); |
569 | |
570 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE)); |
571 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE)); |
572 | |
573 | /* Run the whole thing three times to make sure that the streams |
574 | * reset the writability/readability state again */ |
575 | for (i = 0; i < 3; i++) { |
576 | gsize written = 0, written_complete = 0; |
577 | gssize read = 0, read_complete = 0; |
578 | |
579 | do |
580 | { |
581 | written_complete += written; |
582 | |
583 | vectors[0].buffer = data_write; |
584 | vectors[0].size = 256; |
585 | vectors[1].buffer = data_write + 256; |
586 | vectors[1].size = 256; |
587 | vectors[2].buffer = data_write + 512; |
588 | vectors[2].size = 256; |
589 | vectors[3].buffer = data_write + 768; |
590 | vectors[3].size = 256; |
591 | |
592 | res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os), |
593 | vectors, |
594 | G_N_ELEMENTS (vectors), |
595 | bytes_written: &written, |
596 | NULL, |
597 | error: &err); |
598 | } |
599 | while (res == G_POLLABLE_RETURN_OK); |
600 | |
601 | g_assert_cmpuint (written_complete, >, 0); |
602 | g_assert_null (err); |
603 | g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK); |
604 | /* writev() on UNIX streams either succeeds fully or not at all */ |
605 | g_assert_cmpuint (written, ==, 0); |
606 | |
607 | do |
608 | { |
609 | read_complete += read; |
610 | read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is), |
611 | buffer: data_read, |
612 | count: sizeof (data_read), |
613 | NULL, |
614 | error: &err); |
615 | if (read > 0) |
616 | g_assert_cmpmem (data_read, read, data_write, sizeof (data_write)); |
617 | } |
618 | while (read > 0); |
619 | |
620 | g_assert_cmpuint (read_complete, ==, written_complete); |
621 | g_assert_nonnull (err); |
622 | g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK); |
623 | g_clear_error (err: &err); |
624 | } |
625 | |
626 | g_object_unref (object: os); |
627 | g_object_unref (object: is); |
628 | #endif /* if F_GETPIPE_SZ */ |
629 | } |
630 | |
631 | #ifdef F_GETPIPE_SZ |
632 | static void |
633 | write_async_wouldblock_cb (GUnixOutputStream *os, |
634 | GAsyncResult *result, |
635 | gpointer user_data) |
636 | { |
637 | gsize *bytes_written = user_data; |
638 | GError *err = NULL; |
639 | |
640 | g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, error: &err); |
641 | g_assert_no_error (err); |
642 | } |
643 | |
644 | static void |
645 | read_async_wouldblock_cb (GUnixInputStream *is, |
646 | GAsyncResult *result, |
647 | gpointer user_data) |
648 | { |
649 | gsize *bytes_read = user_data; |
650 | GError *err = NULL; |
651 | |
652 | g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, error: &err); |
653 | g_assert_no_error (err); |
654 | } |
655 | #endif /* if F_GETPIPE_SZ */ |
656 | |
657 | /* test if the async implementation of write_all() and read_all() in G*Stream |
658 | * around the GPollable*Stream API is working correctly. |
659 | */ |
660 | static void |
661 | test_write_async_wouldblock (void) |
662 | { |
663 | #ifndef F_GETPIPE_SZ |
664 | g_test_skip ("F_GETPIPE_SZ not defined" ); |
665 | #else /* if F_GETPIPE_SZ */ |
666 | GUnixInputStream *is; |
667 | GUnixOutputStream *os; |
668 | gint fd[2]; |
669 | guint8 *data, *data_read; |
670 | guint i; |
671 | gint pipe_capacity; |
672 | gsize bytes_written = 0, bytes_read = 0; |
673 | |
674 | g_assert_cmpint (pipe (fd), ==, 0); |
675 | |
676 | /* FIXME: These should not be needed but otherwise |
677 | * g_unix_output_stream_write() will block because |
678 | * a) the fd is writable |
679 | * b) writing 4x capacity will block because writes are atomic |
680 | * c) the fd is blocking |
681 | * |
682 | * See https://gitlab.gnome.org/GNOME/glib/issues/1654 |
683 | */ |
684 | g_unix_set_fd_nonblocking (fd: fd[0], TRUE, NULL); |
685 | g_unix_set_fd_nonblocking (fd: fd[1], TRUE, NULL); |
686 | |
687 | g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0); |
688 | pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL); |
689 | g_assert_cmpint (pipe_capacity, >=, 4096); |
690 | |
691 | data = g_new (guint8, 4 * pipe_capacity); |
692 | for (i = 0; i < 4 * pipe_capacity; i++) |
693 | data[i] = i; |
694 | data_read = g_new (guint8, 4 * pipe_capacity); |
695 | |
696 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE)); |
697 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE)); |
698 | |
699 | g_output_stream_write_all_async (G_OUTPUT_STREAM (os), |
700 | buffer: data, |
701 | count: 4 * pipe_capacity, |
702 | G_PRIORITY_DEFAULT, |
703 | NULL, |
704 | callback: (GAsyncReadyCallback) write_async_wouldblock_cb, |
705 | user_data: &bytes_written); |
706 | |
707 | g_input_stream_read_all_async (G_INPUT_STREAM (is), |
708 | buffer: data_read, |
709 | count: 4 * pipe_capacity, |
710 | G_PRIORITY_DEFAULT, |
711 | NULL, |
712 | callback: (GAsyncReadyCallback) read_async_wouldblock_cb, |
713 | user_data: &bytes_read); |
714 | |
715 | while (bytes_written == 0 && bytes_read == 0) |
716 | g_main_context_iteration (NULL, TRUE); |
717 | |
718 | g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity); |
719 | g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity); |
720 | g_assert_cmpmem (data_read, bytes_read, data, bytes_written); |
721 | |
722 | g_free (mem: data); |
723 | g_free (mem: data_read); |
724 | |
725 | g_object_unref (object: os); |
726 | g_object_unref (object: is); |
727 | #endif /* if F_GETPIPE_SZ */ |
728 | } |
729 | |
730 | #ifdef F_GETPIPE_SZ |
731 | static void |
732 | writev_async_wouldblock_cb (GUnixOutputStream *os, |
733 | GAsyncResult *result, |
734 | gpointer user_data) |
735 | { |
736 | gsize *bytes_written = user_data; |
737 | GError *err = NULL; |
738 | |
739 | g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, error: &err); |
740 | g_assert_no_error (err); |
741 | } |
742 | #endif /* if F_GETPIPE_SZ */ |
743 | |
744 | /* test if the async implementation of writev_all() and read_all() in G*Stream |
745 | * around the GPollable*Stream API is working correctly. |
746 | */ |
747 | static void |
748 | test_writev_async_wouldblock (void) |
749 | { |
750 | #ifndef F_GETPIPE_SZ |
751 | g_test_skip ("F_GETPIPE_SZ not defined" ); |
752 | #else /* if F_GETPIPE_SZ */ |
753 | GUnixInputStream *is; |
754 | GUnixOutputStream *os; |
755 | gint fd[2]; |
756 | guint8 *data, *data_read; |
757 | guint i; |
758 | gint pipe_capacity; |
759 | gsize bytes_written = 0, bytes_read = 0; |
760 | GOutputVector vectors[4]; |
761 | |
762 | g_assert_cmpint (pipe (fd), ==, 0); |
763 | |
764 | /* FIXME: These should not be needed but otherwise |
765 | * g_unix_output_stream_writev() will block because |
766 | * a) the fd is writable |
767 | * b) writing 4x capacity will block because writes are atomic |
768 | * c) the fd is blocking |
769 | * |
770 | * See https://gitlab.gnome.org/GNOME/glib/issues/1654 |
771 | */ |
772 | g_unix_set_fd_nonblocking (fd: fd[0], TRUE, NULL); |
773 | g_unix_set_fd_nonblocking (fd: fd[1], TRUE, NULL); |
774 | |
775 | g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0); |
776 | pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL); |
777 | g_assert_cmpint (pipe_capacity, >=, 4096); |
778 | |
779 | data = g_new (guint8, 4 * pipe_capacity); |
780 | for (i = 0; i < 4 * pipe_capacity; i++) |
781 | data[i] = i; |
782 | data_read = g_new (guint8, 4 * pipe_capacity); |
783 | |
784 | vectors[0].buffer = data; |
785 | vectors[0].size = 1024; |
786 | vectors[1].buffer = data + 1024; |
787 | vectors[1].size = 1024; |
788 | vectors[2].buffer = data + 2048; |
789 | vectors[2].size = 1024; |
790 | vectors[3].buffer = data + 3072; |
791 | vectors[3].size = 4 * pipe_capacity - 3072; |
792 | |
793 | is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE)); |
794 | os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE)); |
795 | |
796 | g_output_stream_writev_all_async (G_OUTPUT_STREAM (os), |
797 | vectors, |
798 | G_N_ELEMENTS (vectors), |
799 | G_PRIORITY_DEFAULT, |
800 | NULL, |
801 | callback: (GAsyncReadyCallback) writev_async_wouldblock_cb, |
802 | user_data: &bytes_written); |
803 | |
804 | g_input_stream_read_all_async (G_INPUT_STREAM (is), |
805 | buffer: data_read, |
806 | count: 4 * pipe_capacity, |
807 | G_PRIORITY_DEFAULT, |
808 | NULL, |
809 | callback: (GAsyncReadyCallback) read_async_wouldblock_cb, |
810 | user_data: &bytes_read); |
811 | |
812 | while (bytes_written == 0 && bytes_read == 0) |
813 | g_main_context_iteration (NULL, TRUE); |
814 | |
815 | g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity); |
816 | g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity); |
817 | g_assert_cmpmem (data_read, bytes_read, data, bytes_written); |
818 | |
819 | g_free (mem: data); |
820 | g_free (mem: data_read); |
821 | |
822 | g_object_unref (object: os); |
823 | g_object_unref (object: is); |
824 | #endif /* F_GETPIPE_SZ */ |
825 | } |
826 | |
827 | int |
828 | main (int argc, |
829 | char *argv[]) |
830 | { |
831 | g_test_init (argc: &argc, argv: &argv, NULL); |
832 | |
833 | g_test_add_func (testpath: "/unix-streams/basic" , test_func: test_basic); |
834 | g_test_add_data_func (testpath: "/unix-streams/pipe-io-test" , |
835 | GINT_TO_POINTER (FALSE), |
836 | test_func: test_pipe_io); |
837 | g_test_add_data_func (testpath: "/unix-streams/nonblocking-io-test" , |
838 | GINT_TO_POINTER (TRUE), |
839 | test_func: test_pipe_io); |
840 | |
841 | g_test_add_data_func (testpath: "/unix-streams/read_write" , |
842 | GINT_TO_POINTER (FALSE), |
843 | test_func: test_read_write); |
844 | |
845 | g_test_add_data_func (testpath: "/unix-streams/read_writev" , |
846 | GINT_TO_POINTER (TRUE), |
847 | test_func: test_read_write); |
848 | |
849 | g_test_add_func (testpath: "/unix-streams/write-wouldblock" , |
850 | test_func: test_write_wouldblock); |
851 | g_test_add_func (testpath: "/unix-streams/writev-wouldblock" , |
852 | test_func: test_writev_wouldblock); |
853 | |
854 | g_test_add_func (testpath: "/unix-streams/write-async-wouldblock" , |
855 | test_func: test_write_async_wouldblock); |
856 | g_test_add_func (testpath: "/unix-streams/writev-async-wouldblock" , |
857 | test_func: test_writev_async_wouldblock); |
858 | |
859 | return g_test_run(); |
860 | } |
861 | |