1/* GLib testing framework examples and tests
2 * Copyright (C) 2008 Red Hat, Inc
3 *
4 * This work is provided "as is"; redistribution and modification
5 * in whole or in part, in any medium, physical or electronic is
6 * permitted without restriction.
7 *
8 * This work is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * In no event shall the authors or contributors be liable for any
13 * direct, indirect, incidental, special, exemplary, or consequential
14 * damages (including, but not limited to, procurement of substitute
15 * goods or services; loss of use, data, or profits; or business
16 * interruption) however caused and on any theory of liability, whether
17 * in contract, strict liability, or tort (including negligence or
18 * otherwise) arising in any way out of the use of this software, even
19 * if advised of the possibility of such damage.
20 */
21
22#include <gio/gio.h>
23#include <gio/gunixinputstream.h>
24#include <gio/gunixoutputstream.h>
25#include <glib.h>
26#include <glib/glib-unix.h>
27#include <signal.h>
28#include <stdlib.h>
29#include <string.h>
30#include <unistd.h>
31#include <fcntl.h>
32
33/* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */
34static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz";
35
36int writer_pipe[2], reader_pipe[2];
37GCancellable *writer_cancel, *reader_cancel, *main_cancel;
38GMainLoop *loop;
39
40
41static gpointer
42writer_thread (gpointer user_data)
43{
44 GOutputStream *out;
45 gssize nwrote, offset;
46 GError *err = NULL;
47
48 out = g_unix_output_stream_new (fd: writer_pipe[1], TRUE);
49
50 do
51 {
52 g_usleep (microseconds: 10);
53
54 offset = 0;
55 while (offset < (gssize) sizeof (DATA))
56 {
57 nwrote = g_output_stream_write (stream: out, buffer: DATA + offset,
58 count: sizeof (DATA) - offset,
59 cancellable: writer_cancel, error: &err);
60 if (nwrote <= 0 || err != NULL)
61 break;
62 offset += nwrote;
63 }
64
65 g_assert (nwrote > 0 || err != NULL);
66 }
67 while (err == NULL);
68
69 if (g_cancellable_is_cancelled (cancellable: writer_cancel))
70 {
71 g_clear_error (err: &err);
72 g_cancellable_cancel (cancellable: main_cancel);
73 g_object_unref (object: out);
74 return NULL;
75 }
76
77 g_warning ("writer: %s", err->message);
78 g_assert_not_reached ();
79}
80
81static gpointer
82reader_thread (gpointer user_data)
83{
84 GInputStream *in;
85 gssize nread = 0, total;
86 GError *err = NULL;
87 char buf[sizeof (DATA)];
88
89 in = g_unix_input_stream_new (fd: reader_pipe[0], TRUE);
90
91 do
92 {
93 total = 0;
94 while (total < (gssize) sizeof (DATA))
95 {
96 nread = g_input_stream_read (stream: in, buffer: buf + total, count: sizeof (buf) - total,
97 cancellable: reader_cancel, error: &err);
98 if (nread <= 0 || err != NULL)
99 break;
100 total += nread;
101 }
102
103 if (err)
104 break;
105
106 if (nread == 0)
107 {
108 g_assert (err == NULL);
109 /* pipe closed */
110 g_object_unref (object: in);
111 return NULL;
112 }
113
114 g_assert_cmpstr (buf, ==, DATA);
115 g_assert (!g_cancellable_is_cancelled (reader_cancel));
116 }
117 while (err == NULL);
118
119 g_warning ("reader: %s", err->message);
120 g_assert_not_reached ();
121}
122
123static char main_buf[sizeof (DATA)];
124static gssize main_len, main_offset;
125
126static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
127static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
128static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
129
130static void
131do_main_cancel (GOutputStream *out)
132{
133 g_output_stream_close (stream: out, NULL, NULL);
134 g_main_loop_quit (loop);
135}
136
137static void
138main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
139{
140 GInputStream *in = G_INPUT_STREAM (source);
141 GOutputStream *out = user_data;
142 GError *err = NULL;
143 gssize nskipped;
144
145 nskipped = g_input_stream_skip_finish (stream: in, result: res, error: &err);
146
147 if (g_cancellable_is_cancelled (cancellable: main_cancel))
148 {
149 do_main_cancel (out);
150 return;
151 }
152
153 g_assert_no_error (err);
154
155 main_offset += nskipped;
156 if (main_offset == main_len)
157 {
158 main_offset = 0;
159 g_output_stream_write_async (stream: out, buffer: main_buf, count: main_len,
160 G_PRIORITY_DEFAULT, cancellable: main_cancel,
161 callback: main_thread_wrote, user_data: in);
162 }
163 else
164 {
165 g_input_stream_skip_async (stream: in, count: main_len - main_offset,
166 G_PRIORITY_DEFAULT, cancellable: main_cancel,
167 callback: main_thread_skipped, user_data: out);
168 }
169}
170
171static void
172main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
173{
174 GInputStream *in = G_INPUT_STREAM (source);
175 GOutputStream *out = user_data;
176 GError *err = NULL;
177 gssize nread;
178
179 nread = g_input_stream_read_finish (stream: in, result: res, error: &err);
180
181 if (g_cancellable_is_cancelled (cancellable: main_cancel))
182 {
183 do_main_cancel (out);
184 g_clear_error (err: &err);
185 return;
186 }
187
188 g_assert_no_error (err);
189
190 main_offset += nread;
191 if (main_offset == sizeof (DATA))
192 {
193 main_len = main_offset;
194 main_offset = 0;
195 /* Now skip the same amount */
196 g_input_stream_skip_async (stream: in, count: main_len,
197 G_PRIORITY_DEFAULT, cancellable: main_cancel,
198 callback: main_thread_skipped, user_data: out);
199 }
200 else
201 {
202 g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf),
203 G_PRIORITY_DEFAULT, cancellable: main_cancel,
204 callback: main_thread_read, user_data: out);
205 }
206}
207
208static void
209main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
210{
211 GOutputStream *out = G_OUTPUT_STREAM (source);
212 GInputStream *in = user_data;
213 GError *err = NULL;
214 gssize nwrote;
215
216 nwrote = g_output_stream_write_finish (stream: out, result: res, error: &err);
217
218 if (g_cancellable_is_cancelled (cancellable: main_cancel))
219 {
220 do_main_cancel (out);
221 g_clear_error (err: &err);
222 return;
223 }
224
225 g_assert_no_error (err);
226 g_assert_cmpint (nwrote, <=, main_len - main_offset);
227
228 main_offset += nwrote;
229 if (main_offset == main_len)
230 {
231 main_offset = 0;
232 g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf),
233 G_PRIORITY_DEFAULT, cancellable: main_cancel,
234 callback: main_thread_read, user_data: out);
235 }
236 else
237 {
238 g_output_stream_write_async (stream: out, buffer: main_buf + main_offset,
239 count: main_len - main_offset,
240 G_PRIORITY_DEFAULT, cancellable: main_cancel,
241 callback: main_thread_wrote, user_data: in);
242 }
243}
244
245static gboolean
246timeout (gpointer cancellable)
247{
248 g_cancellable_cancel (cancellable);
249 return FALSE;
250}
251
252static void
253test_pipe_io (gconstpointer nonblocking)
254{
255 GThread *writer, *reader;
256 GInputStream *in;
257 GOutputStream *out;
258
259 /* Split off two (additional) threads, a reader and a writer. From
260 * the writer thread, write data synchronously in small chunks,
261 * which gets alternately read and skipped asynchronously by the
262 * main thread and then (if not skipped) written asynchronously to
263 * the reader thread, which reads it synchronously. Eventually a
264 * timeout in the main thread will cause it to cancel the writer
265 * thread, which will in turn cancel the read op in the main thread,
266 * which will then close the pipe to the reader thread, causing the
267 * read op to fail.
268 */
269
270 g_assert (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
271
272 if (nonblocking)
273 {
274 GError *error = NULL;
275
276 g_unix_set_fd_nonblocking (fd: writer_pipe[0], TRUE, error: &error);
277 g_assert_no_error (error);
278 g_unix_set_fd_nonblocking (fd: writer_pipe[1], TRUE, error: &error);
279 g_assert_no_error (error);
280 g_unix_set_fd_nonblocking (fd: reader_pipe[0], TRUE, error: &error);
281 g_assert_no_error (error);
282 g_unix_set_fd_nonblocking (fd: reader_pipe[1], TRUE, error: &error);
283 g_assert_no_error (error);
284 }
285
286 writer_cancel = g_cancellable_new ();
287 reader_cancel = g_cancellable_new ();
288 main_cancel = g_cancellable_new ();
289
290 writer = g_thread_new (name: "writer", func: writer_thread, NULL);
291 reader = g_thread_new (name: "reader", func: reader_thread, NULL);
292
293 in = g_unix_input_stream_new (fd: writer_pipe[0], TRUE);
294 out = g_unix_output_stream_new (fd: reader_pipe[1], TRUE);
295
296 g_input_stream_read_async (stream: in, buffer: main_buf, count: sizeof (main_buf),
297 G_PRIORITY_DEFAULT, cancellable: main_cancel,
298 callback: main_thread_read, user_data: out);
299
300 g_timeout_add (interval: 500, function: timeout, data: writer_cancel);
301
302 loop = g_main_loop_new (NULL, TRUE);
303 g_main_loop_run (loop);
304 g_main_loop_unref (loop);
305
306 g_thread_join (thread: reader);
307 g_thread_join (thread: writer);
308
309 g_object_unref (object: main_cancel);
310 g_object_unref (object: reader_cancel);
311 g_object_unref (object: writer_cancel);
312 g_object_unref (object: in);
313 g_object_unref (object: out);
314}
315
316static void
317test_basic (void)
318{
319 GUnixInputStream *is;
320 GUnixOutputStream *os;
321 gint fd;
322 gboolean close_fd;
323
324 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
325 g_object_get (object: is,
326 first_property_name: "fd", &fd,
327 "close-fd", &close_fd,
328 NULL);
329 g_assert_cmpint (fd, ==, 0);
330 g_assert (close_fd);
331
332 g_unix_input_stream_set_close_fd (stream: is, FALSE);
333 g_assert (!g_unix_input_stream_get_close_fd (is));
334 g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
335
336 g_assert (!g_input_stream_has_pending (G_INPUT_STREAM (is)));
337
338 g_object_unref (object: is);
339
340 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
341 g_object_get (object: os,
342 first_property_name: "fd", &fd,
343 "close-fd", &close_fd,
344 NULL);
345 g_assert_cmpint (fd, ==, 1);
346 g_assert (close_fd);
347
348 g_unix_output_stream_set_close_fd (stream: os, FALSE);
349 g_assert (!g_unix_output_stream_get_close_fd (os));
350 g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
351
352 g_assert (!g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
353
354 g_object_unref (object: os);
355}
356
357typedef struct {
358 GInputStream *is;
359 GOutputStream *os;
360 const guint8 *write_data;
361 guint8 *read_data;
362} TestReadWriteData;
363
364static gpointer
365test_read_write_write_thread (gpointer user_data)
366{
367 TestReadWriteData *data = user_data;
368 gsize bytes_written;
369 GError *error = NULL;
370 gboolean res;
371
372 res = g_output_stream_write_all (stream: data->os, buffer: data->write_data, count: 1024, bytes_written: &bytes_written, NULL, error: &error);
373 g_assert_true (res);
374 g_assert_no_error (error);
375 g_assert_cmpuint (bytes_written, ==, 1024);
376
377 return NULL;
378}
379
380static gpointer
381test_read_write_read_thread (gpointer user_data)
382{
383 TestReadWriteData *data = user_data;
384 gsize bytes_read;
385 GError *error = NULL;
386 gboolean res;
387
388 res = g_input_stream_read_all (stream: data->is, buffer: data->read_data, count: 1024, bytes_read: &bytes_read, NULL, error: &error);
389 g_assert_true (res);
390 g_assert_no_error (error);
391 g_assert_cmpuint (bytes_read, ==, 1024);
392
393 return NULL;
394}
395
396static gpointer
397test_read_write_writev_thread (gpointer user_data)
398{
399 TestReadWriteData *data = user_data;
400 gsize bytes_written;
401 GError *error = NULL;
402 gboolean res;
403 GOutputVector vectors[3];
404
405 vectors[0].buffer = data->write_data;
406 vectors[0].size = 256;
407 vectors[1].buffer = data->write_data + 256;
408 vectors[1].size = 256;
409 vectors[2].buffer = data->write_data + 512;
410 vectors[2].size = 512;
411
412 res = g_output_stream_writev_all (stream: data->os, vectors, G_N_ELEMENTS (vectors), bytes_written: &bytes_written, NULL, error: &error);
413 g_assert_true (res);
414 g_assert_no_error (error);
415 g_assert_cmpuint (bytes_written, ==, 1024);
416
417 return NULL;
418}
419
420/* test if normal writing/reading from a pipe works */
421static void
422test_read_write (gconstpointer user_data)
423{
424 gboolean writev = GPOINTER_TO_INT (user_data);
425 GUnixInputStream *is;
426 GUnixOutputStream *os;
427 gint fd[2];
428 guint8 data_write[1024], data_read[1024];
429 guint i;
430 GThread *write_thread, *read_thread;
431 TestReadWriteData data;
432
433 for (i = 0; i < sizeof (data_write); i++)
434 data_write[i] = i;
435
436 g_assert_cmpint (pipe (fd), ==, 0);
437
438 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
439 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
440
441 data.is = G_INPUT_STREAM (is);
442 data.os = G_OUTPUT_STREAM (os);
443 data.read_data = data_read;
444 data.write_data = data_write;
445
446 if (writev)
447 write_thread = g_thread_new (name: "writer", func: test_read_write_writev_thread, data: &data);
448 else
449 write_thread = g_thread_new (name: "writer", func: test_read_write_write_thread, data: &data);
450 read_thread = g_thread_new (name: "reader", func: test_read_write_read_thread, data: &data);
451
452 g_thread_join (thread: write_thread);
453 g_thread_join (thread: read_thread);
454
455 g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
456
457 g_object_unref (object: os);
458 g_object_unref (object: is);
459}
460
461/* test if g_pollable_output_stream_write_nonblocking() and
462 * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
463 * and correctly reset their status afterwards again, and all data that is
464 * written can also be read again.
465 */
466static void
467test_write_wouldblock (void)
468{
469#ifndef F_GETPIPE_SZ
470 g_test_skip ("F_GETPIPE_SZ not defined");
471#else /* if F_GETPIPE_SZ */
472 GUnixInputStream *is;
473 GUnixOutputStream *os;
474 gint fd[2];
475 GError *err = NULL;
476 guint8 data_write[1024], data_read[1024];
477 guint i;
478 gint pipe_capacity;
479
480 for (i = 0; i < sizeof (data_write); i++)
481 data_write[i] = i;
482
483 g_assert_cmpint (pipe (fd), ==, 0);
484
485 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
486 pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
487 g_assert_cmpint (pipe_capacity, >=, 4096);
488 g_assert_cmpint (pipe_capacity % 1024, >=, 0);
489
490 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
491 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
492
493 /* Run the whole thing three times to make sure that the streams
494 * reset the writability/readability state again */
495 for (i = 0; i < 3; i++) {
496 gssize written = 0, written_complete = 0;
497 gssize read = 0, read_complete = 0;
498
499 do
500 {
501 written_complete += written;
502 written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
503 buffer: data_write,
504 count: sizeof (data_write),
505 NULL,
506 error: &err);
507 }
508 while (written > 0);
509
510 g_assert_cmpuint (written_complete, >, 0);
511 g_assert_nonnull (err);
512 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
513 g_clear_error (err: &err);
514
515 do
516 {
517 read_complete += read;
518 read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
519 buffer: data_read,
520 count: sizeof (data_read),
521 NULL,
522 error: &err);
523 if (read > 0)
524 g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
525 }
526 while (read > 0);
527
528 g_assert_cmpuint (read_complete, ==, written_complete);
529 g_assert_nonnull (err);
530 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
531 g_clear_error (err: &err);
532 }
533
534 g_object_unref (object: os);
535 g_object_unref (object: is);
536#endif /* if F_GETPIPE_SZ */
537}
538
539/* test if g_pollable_output_stream_writev_nonblocking() and
540 * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
541 * and correctly reset their status afterwards again, and all data that is
542 * written can also be read again.
543 */
544static void
545test_writev_wouldblock (void)
546{
547#ifndef F_GETPIPE_SZ
548 g_test_skip ("F_GETPIPE_SZ not defined");
549#else /* if F_GETPIPE_SZ */
550 GUnixInputStream *is;
551 GUnixOutputStream *os;
552 gint fd[2];
553 GError *err = NULL;
554 guint8 data_write[1024], data_read[1024];
555 guint i;
556 GOutputVector vectors[4];
557 GPollableReturn res;
558 gint pipe_capacity;
559
560 for (i = 0; i < sizeof (data_write); i++)
561 data_write[i] = i;
562
563 g_assert_cmpint (pipe (fd), ==, 0);
564
565 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
566 pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
567 g_assert_cmpint (pipe_capacity, >=, 4096);
568 g_assert_cmpint (pipe_capacity % 1024, >=, 0);
569
570 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
571 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
572
573 /* Run the whole thing three times to make sure that the streams
574 * reset the writability/readability state again */
575 for (i = 0; i < 3; i++) {
576 gsize written = 0, written_complete = 0;
577 gssize read = 0, read_complete = 0;
578
579 do
580 {
581 written_complete += written;
582
583 vectors[0].buffer = data_write;
584 vectors[0].size = 256;
585 vectors[1].buffer = data_write + 256;
586 vectors[1].size = 256;
587 vectors[2].buffer = data_write + 512;
588 vectors[2].size = 256;
589 vectors[3].buffer = data_write + 768;
590 vectors[3].size = 256;
591
592 res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
593 vectors,
594 G_N_ELEMENTS (vectors),
595 bytes_written: &written,
596 NULL,
597 error: &err);
598 }
599 while (res == G_POLLABLE_RETURN_OK);
600
601 g_assert_cmpuint (written_complete, >, 0);
602 g_assert_null (err);
603 g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
604 /* writev() on UNIX streams either succeeds fully or not at all */
605 g_assert_cmpuint (written, ==, 0);
606
607 do
608 {
609 read_complete += read;
610 read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
611 buffer: data_read,
612 count: sizeof (data_read),
613 NULL,
614 error: &err);
615 if (read > 0)
616 g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
617 }
618 while (read > 0);
619
620 g_assert_cmpuint (read_complete, ==, written_complete);
621 g_assert_nonnull (err);
622 g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
623 g_clear_error (err: &err);
624 }
625
626 g_object_unref (object: os);
627 g_object_unref (object: is);
628#endif /* if F_GETPIPE_SZ */
629}
630
631#ifdef F_GETPIPE_SZ
632static void
633write_async_wouldblock_cb (GUnixOutputStream *os,
634 GAsyncResult *result,
635 gpointer user_data)
636{
637 gsize *bytes_written = user_data;
638 GError *err = NULL;
639
640 g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, error: &err);
641 g_assert_no_error (err);
642}
643
644static void
645read_async_wouldblock_cb (GUnixInputStream *is,
646 GAsyncResult *result,
647 gpointer user_data)
648{
649 gsize *bytes_read = user_data;
650 GError *err = NULL;
651
652 g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, error: &err);
653 g_assert_no_error (err);
654}
655#endif /* if F_GETPIPE_SZ */
656
657/* test if the async implementation of write_all() and read_all() in G*Stream
658 * around the GPollable*Stream API is working correctly.
659 */
660static void
661test_write_async_wouldblock (void)
662{
663#ifndef F_GETPIPE_SZ
664 g_test_skip ("F_GETPIPE_SZ not defined");
665#else /* if F_GETPIPE_SZ */
666 GUnixInputStream *is;
667 GUnixOutputStream *os;
668 gint fd[2];
669 guint8 *data, *data_read;
670 guint i;
671 gint pipe_capacity;
672 gsize bytes_written = 0, bytes_read = 0;
673
674 g_assert_cmpint (pipe (fd), ==, 0);
675
676 /* FIXME: These should not be needed but otherwise
677 * g_unix_output_stream_write() will block because
678 * a) the fd is writable
679 * b) writing 4x capacity will block because writes are atomic
680 * c) the fd is blocking
681 *
682 * See https://gitlab.gnome.org/GNOME/glib/issues/1654
683 */
684 g_unix_set_fd_nonblocking (fd: fd[0], TRUE, NULL);
685 g_unix_set_fd_nonblocking (fd: fd[1], TRUE, NULL);
686
687 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
688 pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
689 g_assert_cmpint (pipe_capacity, >=, 4096);
690
691 data = g_new (guint8, 4 * pipe_capacity);
692 for (i = 0; i < 4 * pipe_capacity; i++)
693 data[i] = i;
694 data_read = g_new (guint8, 4 * pipe_capacity);
695
696 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
697 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
698
699 g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
700 buffer: data,
701 count: 4 * pipe_capacity,
702 G_PRIORITY_DEFAULT,
703 NULL,
704 callback: (GAsyncReadyCallback) write_async_wouldblock_cb,
705 user_data: &bytes_written);
706
707 g_input_stream_read_all_async (G_INPUT_STREAM (is),
708 buffer: data_read,
709 count: 4 * pipe_capacity,
710 G_PRIORITY_DEFAULT,
711 NULL,
712 callback: (GAsyncReadyCallback) read_async_wouldblock_cb,
713 user_data: &bytes_read);
714
715 while (bytes_written == 0 && bytes_read == 0)
716 g_main_context_iteration (NULL, TRUE);
717
718 g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
719 g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
720 g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
721
722 g_free (mem: data);
723 g_free (mem: data_read);
724
725 g_object_unref (object: os);
726 g_object_unref (object: is);
727#endif /* if F_GETPIPE_SZ */
728}
729
730#ifdef F_GETPIPE_SZ
731static void
732writev_async_wouldblock_cb (GUnixOutputStream *os,
733 GAsyncResult *result,
734 gpointer user_data)
735{
736 gsize *bytes_written = user_data;
737 GError *err = NULL;
738
739 g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, error: &err);
740 g_assert_no_error (err);
741}
742#endif /* if F_GETPIPE_SZ */
743
744/* test if the async implementation of writev_all() and read_all() in G*Stream
745 * around the GPollable*Stream API is working correctly.
746 */
747static void
748test_writev_async_wouldblock (void)
749{
750#ifndef F_GETPIPE_SZ
751 g_test_skip ("F_GETPIPE_SZ not defined");
752#else /* if F_GETPIPE_SZ */
753 GUnixInputStream *is;
754 GUnixOutputStream *os;
755 gint fd[2];
756 guint8 *data, *data_read;
757 guint i;
758 gint pipe_capacity;
759 gsize bytes_written = 0, bytes_read = 0;
760 GOutputVector vectors[4];
761
762 g_assert_cmpint (pipe (fd), ==, 0);
763
764 /* FIXME: These should not be needed but otherwise
765 * g_unix_output_stream_writev() will block because
766 * a) the fd is writable
767 * b) writing 4x capacity will block because writes are atomic
768 * c) the fd is blocking
769 *
770 * See https://gitlab.gnome.org/GNOME/glib/issues/1654
771 */
772 g_unix_set_fd_nonblocking (fd: fd[0], TRUE, NULL);
773 g_unix_set_fd_nonblocking (fd: fd[1], TRUE, NULL);
774
775 g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
776 pipe_capacity = fcntl (fd: fd[0], F_GETPIPE_SZ, &pipe_capacity, NULL);
777 g_assert_cmpint (pipe_capacity, >=, 4096);
778
779 data = g_new (guint8, 4 * pipe_capacity);
780 for (i = 0; i < 4 * pipe_capacity; i++)
781 data[i] = i;
782 data_read = g_new (guint8, 4 * pipe_capacity);
783
784 vectors[0].buffer = data;
785 vectors[0].size = 1024;
786 vectors[1].buffer = data + 1024;
787 vectors[1].size = 1024;
788 vectors[2].buffer = data + 2048;
789 vectors[2].size = 1024;
790 vectors[3].buffer = data + 3072;
791 vectors[3].size = 4 * pipe_capacity - 3072;
792
793 is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
794 os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
795
796 g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
797 vectors,
798 G_N_ELEMENTS (vectors),
799 G_PRIORITY_DEFAULT,
800 NULL,
801 callback: (GAsyncReadyCallback) writev_async_wouldblock_cb,
802 user_data: &bytes_written);
803
804 g_input_stream_read_all_async (G_INPUT_STREAM (is),
805 buffer: data_read,
806 count: 4 * pipe_capacity,
807 G_PRIORITY_DEFAULT,
808 NULL,
809 callback: (GAsyncReadyCallback) read_async_wouldblock_cb,
810 user_data: &bytes_read);
811
812 while (bytes_written == 0 && bytes_read == 0)
813 g_main_context_iteration (NULL, TRUE);
814
815 g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
816 g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
817 g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
818
819 g_free (mem: data);
820 g_free (mem: data_read);
821
822 g_object_unref (object: os);
823 g_object_unref (object: is);
824#endif /* F_GETPIPE_SZ */
825}
826
827int
828main (int argc,
829 char *argv[])
830{
831 g_test_init (argc: &argc, argv: &argv, NULL);
832
833 g_test_add_func (testpath: "/unix-streams/basic", test_func: test_basic);
834 g_test_add_data_func (testpath: "/unix-streams/pipe-io-test",
835 GINT_TO_POINTER (FALSE),
836 test_func: test_pipe_io);
837 g_test_add_data_func (testpath: "/unix-streams/nonblocking-io-test",
838 GINT_TO_POINTER (TRUE),
839 test_func: test_pipe_io);
840
841 g_test_add_data_func (testpath: "/unix-streams/read_write",
842 GINT_TO_POINTER (FALSE),
843 test_func: test_read_write);
844
845 g_test_add_data_func (testpath: "/unix-streams/read_writev",
846 GINT_TO_POINTER (TRUE),
847 test_func: test_read_write);
848
849 g_test_add_func (testpath: "/unix-streams/write-wouldblock",
850 test_func: test_write_wouldblock);
851 g_test_add_func (testpath: "/unix-streams/writev-wouldblock",
852 test_func: test_writev_wouldblock);
853
854 g_test_add_func (testpath: "/unix-streams/write-async-wouldblock",
855 test_func: test_write_async_wouldblock);
856 g_test_add_func (testpath: "/unix-streams/writev-async-wouldblock",
857 test_func: test_writev_async_wouldblock);
858
859 return g_test_run();
860}
861

source code of gtk/subprojects/glib/gio/tests/unix-streams.c