(root)/
glib-2.79.0/
gio/
tests/
unix-streams.c
       1  /* GLib testing framework examples and tests
       2   * Copyright (C) 2008 Red Hat, Inc
       3   *
       4   * SPDX-License-Identifier: LicenseRef-old-glib-tests
       5   *
       6   * This work is provided "as is"; redistribution and modification
       7   * in whole or in part, in any medium, physical or electronic is
       8   * permitted without restriction.
       9   *
      10   * This work is distributed in the hope that it will be useful,
      11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
      12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
      13   *
      14   * In no event shall the authors or contributors be liable for any
      15   * direct, indirect, incidental, special, exemplary, or consequential
      16   * damages (including, but not limited to, procurement of substitute
      17   * goods or services; loss of use, data, or profits; or business
      18   * interruption) however caused and on any theory of liability, whether
      19   * in contract, strict liability, or tort (including negligence or
      20   * otherwise) arising in any way out of the use of this software, even
      21   * if advised of the possibility of such damage.
      22   */
      23  
      24  #include <gio/gio.h>
      25  #include <gio/gunixinputstream.h>
      26  #include <gio/gunixoutputstream.h>
      27  #include <glib.h>
      28  #include <glib/glib-unix.h>
      29  #include <signal.h>
      30  #include <stdlib.h>
      31  #include <string.h>
      32  #include <unistd.h>
      33  #include <fcntl.h>
      34  
      35  /* sizeof(DATA) will give the number of bytes in the array, plus the terminating nul */
      36  static const gchar DATA[] = "abcdefghijklmnopqrstuvwxyz";
      37  
      38  int writer_pipe[2], reader_pipe[2];
      39  GCancellable *writer_cancel, *reader_cancel, *main_cancel;
      40  GMainLoop *loop;
      41  
      42  
      43  static gpointer
      44  writer_thread (gpointer user_data)
      45  {
      46    GOutputStream *out;
      47    gssize nwrote, offset;
      48    GError *err = NULL;
      49  
      50    out = g_unix_output_stream_new (writer_pipe[1], TRUE);
      51  
      52    do
      53      {
      54        g_usleep (10);
      55  
      56        offset = 0;
      57        while (offset < (gssize) sizeof (DATA))
      58  	{
      59  	  nwrote = g_output_stream_write (out, DATA + offset,
      60  					  sizeof (DATA) - offset,
      61  					  writer_cancel, &err);
      62  	  if (nwrote <= 0 || err != NULL)
      63  	    break;
      64  	  offset += nwrote;
      65  	}
      66  
      67        g_assert_true (nwrote > 0 || err != NULL);
      68      }
      69    while (err == NULL);
      70  
      71    if (g_cancellable_is_cancelled (writer_cancel))
      72      {
      73        g_clear_error (&err);
      74        g_cancellable_cancel (main_cancel);
      75        g_object_unref (out);
      76        return NULL;
      77      }
      78  
      79    g_warning ("writer: %s", err->message);
      80    g_assert_not_reached ();
      81  }
      82  
      83  static gpointer
      84  reader_thread (gpointer user_data)
      85  {
      86    GInputStream *in;
      87    gssize nread = 0, total;
      88    GError *err = NULL;
      89    char buf[sizeof (DATA)];
      90  
      91    in = g_unix_input_stream_new (reader_pipe[0], TRUE);
      92  
      93    do
      94      {
      95        total = 0;
      96        while (total < (gssize) sizeof (DATA))
      97  	{
      98  	  nread = g_input_stream_read (in, buf + total, sizeof (buf) - total,
      99  				       reader_cancel, &err);
     100  	  if (nread <= 0 || err != NULL)
     101  	    break;
     102  	  total += nread;
     103  	}
     104  
     105        if (err)
     106  	break;
     107  
     108        if (nread == 0)
     109  	{
     110  	  g_assert_no_error (err);
     111  	  /* pipe closed */
     112  	  g_object_unref (in);
     113  	  return NULL;
     114  	}
     115  
     116        g_assert_cmpstr (buf, ==, DATA);
     117        g_assert_false (g_cancellable_is_cancelled (reader_cancel));
     118      }
     119    while (err == NULL);
     120  
     121    g_warning ("reader: %s", err->message);
     122    g_assert_not_reached ();
     123  }
     124  
     125  static char main_buf[sizeof (DATA)];
     126  static gssize main_len, main_offset;
     127  
     128  static void main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data);
     129  static void main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data);
     130  static void main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data);
     131  
     132  static void
     133  do_main_cancel (GOutputStream *out)
     134  {
     135    g_output_stream_close (out, NULL, NULL);
     136    g_main_loop_quit (loop);
     137  }
     138  
     139  static void
     140  main_thread_skipped (GObject *source, GAsyncResult *res, gpointer user_data)
     141  {
     142    GInputStream *in = G_INPUT_STREAM (source);
     143    GOutputStream *out = user_data;
     144    GError *err = NULL;
     145    gssize nskipped;
     146  
     147    nskipped = g_input_stream_skip_finish (in, res, &err);
     148  
     149    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
     150      {
     151        g_assert_true (g_cancellable_is_cancelled (main_cancel));
     152        do_main_cancel (out);
     153        g_clear_error (&err);
     154        return;
     155      }
     156  
     157    g_assert_no_error (err);
     158  
     159    main_offset += nskipped;
     160    if (main_offset == main_len)
     161      {
     162        main_offset = 0;
     163        g_output_stream_write_async (out, main_buf, main_len,
     164                                     G_PRIORITY_DEFAULT, main_cancel,
     165                                     main_thread_wrote, in);
     166      }
     167    else
     168      {
     169        g_input_stream_skip_async (in, main_len - main_offset,
     170  				 G_PRIORITY_DEFAULT, main_cancel,
     171  				 main_thread_skipped, out);
     172      }
     173  }
     174  
     175  static void
     176  main_thread_read (GObject *source, GAsyncResult *res, gpointer user_data)
     177  {
     178    GInputStream *in = G_INPUT_STREAM (source);
     179    GOutputStream *out = user_data;
     180    GError *err = NULL;
     181    gssize nread;
     182  
     183    nread = g_input_stream_read_finish (in, res, &err);
     184  
     185    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
     186      {
     187        g_assert_true (g_cancellable_is_cancelled (main_cancel));
     188        do_main_cancel (out);
     189        g_clear_error (&err);
     190        return;
     191      }
     192  
     193    g_assert_no_error (err);
     194  
     195    main_offset += nread;
     196    if (main_offset == sizeof (DATA))
     197      {
     198        main_len = main_offset;
     199        main_offset = 0;
     200        /* Now skip the same amount */
     201        g_input_stream_skip_async (in, main_len,
     202  				 G_PRIORITY_DEFAULT, main_cancel,
     203  				 main_thread_skipped, out);
     204      }
     205    else
     206      {
     207        g_input_stream_read_async (in, main_buf, sizeof (main_buf),
     208  				 G_PRIORITY_DEFAULT, main_cancel,
     209  				 main_thread_read, out);
     210      }
     211  }
     212  
     213  static void
     214  main_thread_wrote (GObject *source, GAsyncResult *res, gpointer user_data)
     215  {
     216    GOutputStream *out = G_OUTPUT_STREAM (source);
     217    GInputStream *in = user_data;
     218    GError *err = NULL;
     219    gssize nwrote;
     220  
     221    nwrote = g_output_stream_write_finish (out, res, &err);
     222  
     223    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
     224      {
     225        g_assert_true (g_cancellable_is_cancelled (main_cancel));
     226        do_main_cancel (out);
     227        g_clear_error (&err);
     228        return;
     229      }
     230  
     231    g_assert_no_error (err);
     232    g_assert_cmpint (nwrote, <=, main_len - main_offset);
     233  
     234    main_offset += nwrote;
     235    if (main_offset == main_len)
     236      {
     237        main_offset = 0;
     238        g_input_stream_read_async (in, main_buf, sizeof (main_buf),
     239  				 G_PRIORITY_DEFAULT, main_cancel,
     240  				 main_thread_read, out);
     241      }
     242    else
     243      {
     244        g_output_stream_write_async (out, main_buf + main_offset,
     245  				   main_len - main_offset,
     246  				   G_PRIORITY_DEFAULT, main_cancel,
     247  				   main_thread_wrote, in);
     248      }
     249  }
     250  
     251  static gboolean
     252  timeout (gpointer cancellable)
     253  {
     254    g_cancellable_cancel (cancellable);
     255    return FALSE;
     256  }
     257  
     258  static void
     259  test_pipe_io (gconstpointer nonblocking)
     260  {
     261    GThread *writer, *reader;
     262    GInputStream *in;
     263    GOutputStream *out;
     264  
     265    /* Split off two (additional) threads, a reader and a writer. From
     266     * the writer thread, write data synchronously in small chunks,
     267     * which gets alternately read and skipped asynchronously by the
     268     * main thread and then (if not skipped) written asynchronously to
     269     * the reader thread, which reads it synchronously. Eventually a
     270     * timeout in the main thread will cause it to cancel the writer
     271     * thread, which will in turn cancel the read op in the main thread,
     272     * which will then close the pipe to the reader thread, causing the
     273     * read op to fail.
     274     */
     275  
     276    g_assert_true (pipe (writer_pipe) == 0 && pipe (reader_pipe) == 0);
     277  
     278    if (nonblocking)
     279      {
     280        GError *error = NULL;
     281  
     282        g_unix_set_fd_nonblocking (writer_pipe[0], TRUE, &error);
     283        g_assert_no_error (error);
     284        g_unix_set_fd_nonblocking (writer_pipe[1], TRUE, &error);
     285        g_assert_no_error (error);
     286        g_unix_set_fd_nonblocking (reader_pipe[0], TRUE, &error);
     287        g_assert_no_error (error);
     288        g_unix_set_fd_nonblocking (reader_pipe[1], TRUE, &error);
     289        g_assert_no_error (error);
     290      }
     291  
     292    writer_cancel = g_cancellable_new ();
     293    reader_cancel = g_cancellable_new ();
     294    main_cancel = g_cancellable_new ();
     295  
     296    writer = g_thread_new ("writer", writer_thread, NULL);
     297    reader = g_thread_new ("reader", reader_thread, NULL);
     298  
     299    in = g_unix_input_stream_new (writer_pipe[0], TRUE);
     300    out = g_unix_output_stream_new (reader_pipe[1], TRUE);
     301  
     302    g_input_stream_read_async (in, main_buf, sizeof (main_buf),
     303  			     G_PRIORITY_DEFAULT, main_cancel,
     304  			     main_thread_read, out);
     305  
     306    g_timeout_add (500, timeout, writer_cancel);
     307  
     308    loop = g_main_loop_new (NULL, TRUE);
     309    g_main_loop_run (loop);
     310    g_main_loop_unref (loop);
     311  
     312    g_thread_join (reader);
     313    g_thread_join (writer);
     314  
     315    g_object_unref (main_cancel);
     316    g_object_unref (reader_cancel);
     317    g_object_unref (writer_cancel);
     318    g_object_unref (in);
     319    g_object_unref (out);
     320  }
     321  
     322  static void
     323  test_basic (void)
     324  {
     325    GUnixInputStream *is;
     326    GUnixOutputStream *os;
     327    gint fd;
     328    gboolean close_fd;
     329  
     330    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (0, TRUE));
     331    g_object_get (is,
     332                  "fd", &fd,
     333                  "close-fd", &close_fd,
     334                  NULL);
     335    g_assert_cmpint (fd, ==, 0);
     336    g_assert_true (close_fd);
     337  
     338    g_unix_input_stream_set_close_fd (is, FALSE);
     339    g_assert_false (g_unix_input_stream_get_close_fd (is));
     340    g_assert_cmpint (g_unix_input_stream_get_fd (is), ==, 0);
     341  
     342    g_assert_false (g_input_stream_has_pending (G_INPUT_STREAM (is)));
     343  
     344    g_object_unref (is);
     345  
     346    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (1, TRUE));
     347    g_object_get (os,
     348                  "fd", &fd,
     349                  "close-fd", &close_fd,
     350                  NULL);
     351    g_assert_cmpint (fd, ==, 1);
     352    g_assert_true (close_fd);
     353  
     354    g_unix_output_stream_set_close_fd (os, FALSE);
     355    g_assert_false (g_unix_output_stream_get_close_fd (os));
     356    g_assert_cmpint (g_unix_output_stream_get_fd (os), ==, 1);
     357  
     358    g_assert_false (g_output_stream_has_pending (G_OUTPUT_STREAM (os)));
     359  
     360    g_object_unref (os);
     361  }
     362  
     363  typedef struct {
     364    GInputStream *is;
     365    GOutputStream *os;
     366    const guint8 *write_data;
     367    guint8 *read_data;
     368  } TestReadWriteData;
     369  
     370  static gpointer
     371  test_read_write_write_thread (gpointer user_data)
     372  {
     373    TestReadWriteData *data = user_data;
     374    gsize bytes_written;
     375    GError *error = NULL;
     376    gboolean res;
     377  
     378    res = g_output_stream_write_all (data->os, data->write_data, 1024, &bytes_written, NULL, &error);
     379    g_assert_true (res);
     380    g_assert_no_error (error);
     381    g_assert_cmpuint (bytes_written, ==, 1024);
     382  
     383    return NULL;
     384  }
     385  
     386  static gpointer
     387  test_read_write_read_thread (gpointer user_data)
     388  {
     389    TestReadWriteData *data = user_data;
     390    gsize bytes_read;
     391    GError *error = NULL;
     392    gboolean res;
     393  
     394    res = g_input_stream_read_all (data->is, data->read_data, 1024, &bytes_read, NULL, &error);
     395    g_assert_true (res);
     396    g_assert_no_error (error);
     397    g_assert_cmpuint (bytes_read, ==, 1024);
     398  
     399    return NULL;
     400  }
     401  
     402  static gpointer
     403  test_read_write_writev_thread (gpointer user_data)
     404  {
     405    TestReadWriteData *data = user_data;
     406    gsize bytes_written;
     407    GError *error = NULL;
     408    gboolean res;
     409    GOutputVector vectors[3];
     410  
     411    vectors[0].buffer = data->write_data;
     412    vectors[0].size = 256;
     413    vectors[1].buffer = data->write_data + 256;
     414    vectors[1].size = 256;
     415    vectors[2].buffer = data->write_data + 512;
     416    vectors[2].size = 512;
     417  
     418    res = g_output_stream_writev_all (data->os, vectors, G_N_ELEMENTS (vectors), &bytes_written, NULL, &error);
     419    g_assert_true (res);
     420    g_assert_no_error (error);
     421    g_assert_cmpuint (bytes_written, ==, 1024);
     422  
     423    return NULL;
     424  }
     425  
     426  /* test if normal writing/reading from a pipe works */
     427  static void
     428  test_read_write (gconstpointer user_data)
     429  {
     430    gboolean writev = GPOINTER_TO_INT (user_data);
     431    GUnixInputStream *is;
     432    GUnixOutputStream *os;
     433    gint fd[2];
     434    guint8 data_write[1024], data_read[1024];
     435    guint i;
     436    GThread *write_thread, *read_thread;
     437    TestReadWriteData data;
     438  
     439    for (i = 0; i < sizeof (data_write); i++)
     440      data_write[i] = i;
     441  
     442    g_assert_cmpint (pipe (fd), ==, 0);
     443  
     444    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
     445    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
     446  
     447    data.is = G_INPUT_STREAM (is);
     448    data.os = G_OUTPUT_STREAM (os);
     449    data.read_data = data_read;
     450    data.write_data = data_write;
     451  
     452    if (writev)
     453      write_thread = g_thread_new ("writer", test_read_write_writev_thread, &data);
     454    else
     455      write_thread = g_thread_new ("writer", test_read_write_write_thread, &data);
     456    read_thread = g_thread_new ("reader", test_read_write_read_thread, &data);
     457  
     458    g_thread_join (write_thread);
     459    g_thread_join (read_thread);
     460  
     461    g_assert_cmpmem (data_write, sizeof data_write, data_read, sizeof data_read);
     462  
     463    g_object_unref (os);
     464    g_object_unref (is);
     465  }
     466  
     467  /* test if g_pollable_output_stream_write_nonblocking() and
     468   * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
     469   * and correctly reset their status afterwards again, and all data that is
     470   * written can also be read again.
     471   */
     472  static void
     473  test_write_wouldblock (void)
     474  {
     475  #ifndef F_GETPIPE_SZ
     476    g_test_skip ("F_GETPIPE_SZ not defined");
     477  #else  /* if F_GETPIPE_SZ */
     478    GUnixInputStream *is;
     479    GUnixOutputStream *os;
     480    gint fd[2];
     481    GError *err = NULL;
     482    guint8 data_write[1024], data_read[1024];
     483    gsize i;
     484    int retval;
     485    gsize pipe_capacity;
     486  
     487    for (i = 0; i < sizeof (data_write); i++)
     488      data_write[i] = i;
     489  
     490    g_assert_cmpint (pipe (fd), ==, 0);
     491  
     492    g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
     493    retval = fcntl (fd[0], F_GETPIPE_SZ);
     494    g_assert_cmpint (retval, >=, 0);
     495    pipe_capacity = (gsize) retval;
     496    g_assert_cmpint (pipe_capacity, >=, 4096);
     497    g_assert_cmpint (pipe_capacity % 1024, >=, 0);
     498  
     499    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
     500    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
     501  
     502    /* Run the whole thing three times to make sure that the streams
     503     * reset the writability/readability state again */
     504    for (i = 0; i < 3; i++) {
     505      gssize written = 0, written_complete = 0;
     506      gssize read = 0, read_complete = 0;
     507  
     508      do
     509        {
     510          written_complete += written;
     511          written = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
     512                                                                data_write,
     513                                                                sizeof (data_write),
     514                                                                NULL,
     515                                                                &err);
     516        }
     517      while (written > 0);
     518  
     519      g_assert_cmpuint (written_complete, >, 0);
     520      g_assert_nonnull (err);
     521      g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
     522      g_clear_error (&err);
     523  
     524      do
     525        {
     526          read_complete += read;
     527          read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
     528                                                           data_read,
     529                                                           sizeof (data_read),
     530                                                           NULL,
     531                                                           &err);
     532          if (read > 0)
     533            g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
     534        }
     535      while (read > 0);
     536  
     537      g_assert_cmpuint (read_complete, ==, written_complete);
     538      g_assert_nonnull (err);
     539      g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
     540      g_clear_error (&err);
     541    }
     542  
     543    g_object_unref (os);
     544    g_object_unref (is);
     545  #endif  /* if F_GETPIPE_SZ */
     546  }
     547  
     548  /* test if g_pollable_output_stream_writev_nonblocking() and
     549   * g_pollable_output_stream_read_nonblocking() correctly return WOULD_BLOCK
     550   * and correctly reset their status afterwards again, and all data that is
     551   * written can also be read again.
     552   */
     553  static void
     554  test_writev_wouldblock (void)
     555  {
     556  #ifndef F_GETPIPE_SZ
     557    g_test_skip ("F_GETPIPE_SZ not defined");
     558  #else  /* if F_GETPIPE_SZ */
     559    GUnixInputStream *is;
     560    GUnixOutputStream *os;
     561    gint fd[2];
     562    GError *err = NULL;
     563    guint8 data_write[1024], data_read[1024];
     564    gsize i;
     565    int retval;
     566    gsize pipe_capacity;
     567    GOutputVector vectors[4];
     568    GPollableReturn res;
     569  
     570    for (i = 0; i < sizeof (data_write); i++)
     571      data_write[i] = i;
     572  
     573    g_assert_cmpint (pipe (fd), ==, 0);
     574  
     575    g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
     576    retval = fcntl (fd[0], F_GETPIPE_SZ);
     577    g_assert_cmpint (retval, >=, 0);
     578    pipe_capacity = (gsize) retval;
     579    g_assert_cmpint (pipe_capacity, >=, 4096);
     580    g_assert_cmpint (pipe_capacity % 1024, >=, 0);
     581  
     582    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
     583    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
     584  
     585    /* Run the whole thing three times to make sure that the streams
     586     * reset the writability/readability state again */
     587    for (i = 0; i < 3; i++) {
     588      gsize written = 0, written_complete = 0;
     589      gssize read = 0, read_complete = 0;
     590  
     591      do
     592      {
     593          written_complete += written;
     594  
     595          vectors[0].buffer = data_write;
     596          vectors[0].size = 256;
     597          vectors[1].buffer = data_write + 256;
     598          vectors[1].size = 256;
     599          vectors[2].buffer = data_write + 512;
     600          vectors[2].size = 256;
     601          vectors[3].buffer = data_write + 768;
     602          vectors[3].size = 256;
     603  
     604          res = g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM (os),
     605                                                             vectors,
     606                                                             G_N_ELEMENTS (vectors),
     607                                                             &written,
     608                                                             NULL,
     609                                                             &err);
     610        }
     611      while (res == G_POLLABLE_RETURN_OK);
     612  
     613      g_assert_cmpuint (written_complete, >, 0);
     614      g_assert_null (err);
     615      g_assert_cmpint (res, ==, G_POLLABLE_RETURN_WOULD_BLOCK);
     616      /* writev() on UNIX streams either succeeds fully or not at all */
     617      g_assert_cmpuint (written, ==, 0);
     618  
     619      do
     620        {
     621          read_complete += read;
     622          read = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (is),
     623                                                           data_read,
     624                                                           sizeof (data_read),
     625                                                           NULL,
     626                                                           &err);
     627          if (read > 0)
     628            g_assert_cmpmem (data_read, read, data_write, sizeof (data_write));
     629        }
     630      while (read > 0);
     631  
     632      g_assert_cmpuint (read_complete, ==, written_complete);
     633      g_assert_nonnull (err);
     634      g_assert_error (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
     635      g_clear_error (&err);
     636    }
     637  
     638    g_object_unref (os);
     639    g_object_unref (is);
     640  #endif  /* if F_GETPIPE_SZ */
     641  }
     642  
     643  #ifdef F_GETPIPE_SZ
     644  static void
     645  write_async_wouldblock_cb (GUnixOutputStream *os,
     646                             GAsyncResult      *result,
     647                             gpointer           user_data)
     648  {
     649    gsize *bytes_written = user_data;
     650    GError *err = NULL;
     651  
     652    g_output_stream_write_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
     653    g_assert_no_error (err);
     654  }
     655  
     656  static void
     657  read_async_wouldblock_cb (GUnixInputStream  *is,
     658                            GAsyncResult      *result,
     659                            gpointer           user_data)
     660  {
     661    gsize *bytes_read = user_data;
     662    GError *err = NULL;
     663  
     664    g_input_stream_read_all_finish (G_INPUT_STREAM (is), result, bytes_read, &err);
     665    g_assert_no_error (err);
     666  }
     667  #endif  /* if F_GETPIPE_SZ */
     668  
     669  /* test if the async implementation of write_all() and read_all() in G*Stream
     670   * around the GPollable*Stream API is working correctly.
     671   */
     672  static void
     673  test_write_async_wouldblock (void)
     674  {
     675  #ifndef F_GETPIPE_SZ
     676    g_test_skip ("F_GETPIPE_SZ not defined");
     677  #else  /* if F_GETPIPE_SZ */
     678    GUnixInputStream *is;
     679    GUnixOutputStream *os;
     680    gint fd[2];
     681    guint8 *data, *data_read;
     682    gsize i;
     683    int retval;
     684    gsize pipe_capacity;
     685    gsize bytes_written = 0, bytes_read = 0;
     686  
     687    g_assert_cmpint (pipe (fd), ==, 0);
     688  
     689    /* FIXME: These should not be needed but otherwise
     690     * g_unix_output_stream_write() will block because
     691     *   a) the fd is writable
     692     *   b) writing 4x capacity will block because writes are atomic
     693     *   c) the fd is blocking
     694     *
     695     * See https://gitlab.gnome.org/GNOME/glib/issues/1654
     696     */
     697    g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
     698    g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
     699  
     700    g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
     701    retval = fcntl (fd[0], F_GETPIPE_SZ);
     702    g_assert_cmpint (retval, >=, 0);
     703    pipe_capacity = (gsize) retval;
     704    g_assert_cmpint (pipe_capacity, >=, 4096);
     705  
     706    data = g_new (guint8, 4 * pipe_capacity);
     707    for (i = 0; i < 4 * pipe_capacity; i++)
     708      data[i] = i;
     709    data_read = g_new (guint8, 4 * pipe_capacity);
     710  
     711    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
     712    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
     713  
     714    g_output_stream_write_all_async (G_OUTPUT_STREAM (os),
     715                                     data,
     716                                     4 * pipe_capacity,
     717                                     G_PRIORITY_DEFAULT,
     718                                     NULL,
     719                                     (GAsyncReadyCallback) write_async_wouldblock_cb,
     720                                     &bytes_written);
     721  
     722    g_input_stream_read_all_async (G_INPUT_STREAM (is),
     723                                   data_read,
     724                                   4 * pipe_capacity,
     725                                   G_PRIORITY_DEFAULT,
     726                                   NULL,
     727                                   (GAsyncReadyCallback) read_async_wouldblock_cb,
     728                                   &bytes_read);
     729  
     730    while (bytes_written == 0 && bytes_read == 0)
     731      g_main_context_iteration (NULL, TRUE);
     732  
     733    g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
     734    g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
     735    g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
     736  
     737    g_free (data);
     738    g_free (data_read);
     739  
     740    g_object_unref (os);
     741    g_object_unref (is);
     742  #endif  /* if F_GETPIPE_SZ */
     743  }
     744  
     745  #ifdef F_GETPIPE_SZ
     746  static void
     747  writev_async_wouldblock_cb (GUnixOutputStream *os,
     748                              GAsyncResult      *result,
     749                              gpointer           user_data)
     750  {
     751    gsize *bytes_written = user_data;
     752    GError *err = NULL;
     753  
     754    g_output_stream_writev_all_finish (G_OUTPUT_STREAM (os), result, bytes_written, &err);
     755    g_assert_no_error (err);
     756  }
     757  #endif  /* if F_GETPIPE_SZ */
     758  
     759  /* test if the async implementation of writev_all() and read_all() in G*Stream
     760   * around the GPollable*Stream API is working correctly.
     761   */
     762  static void
     763  test_writev_async_wouldblock (void)
     764  {
     765  #ifndef F_GETPIPE_SZ
     766    g_test_skip ("F_GETPIPE_SZ not defined");
     767  #else  /* if F_GETPIPE_SZ */
     768    GUnixInputStream *is;
     769    GUnixOutputStream *os;
     770    gint fd[2];
     771    guint8 *data, *data_read;
     772    gsize i;
     773    int retval;
     774    gsize pipe_capacity;
     775    gsize bytes_written = 0, bytes_read = 0;
     776    GOutputVector vectors[4];
     777  
     778    g_assert_cmpint (pipe (fd), ==, 0);
     779  
     780    /* FIXME: These should not be needed but otherwise
     781     * g_unix_output_stream_writev() will block because
     782     *   a) the fd is writable
     783     *   b) writing 4x capacity will block because writes are atomic
     784     *   c) the fd is blocking
     785     *
     786     * See https://gitlab.gnome.org/GNOME/glib/issues/1654
     787     */
     788    g_unix_set_fd_nonblocking (fd[0], TRUE, NULL);
     789    g_unix_set_fd_nonblocking (fd[1], TRUE, NULL);
     790  
     791    g_assert_cmpint (fcntl (fd[0], F_SETPIPE_SZ, 4096, NULL), !=, 0);
     792    retval = fcntl (fd[0], F_GETPIPE_SZ);
     793    g_assert_cmpint (retval, >=, 0);
     794    pipe_capacity = (gsize) retval;
     795    g_assert_cmpint (pipe_capacity, >=, 4096);
     796  
     797    data = g_new (guint8, 4 * pipe_capacity);
     798    for (i = 0; i < 4 * pipe_capacity; i++)
     799      data[i] = i;
     800    data_read = g_new (guint8, 4 * pipe_capacity);
     801  
     802    vectors[0].buffer = data;
     803    vectors[0].size = 1024;
     804    vectors[1].buffer = data + 1024;
     805    vectors[1].size = 1024;
     806    vectors[2].buffer = data + 2048;
     807    vectors[2].size = 1024;
     808    vectors[3].buffer = data + 3072;
     809    vectors[3].size = 4 * pipe_capacity - 3072;
     810  
     811    is = G_UNIX_INPUT_STREAM (g_unix_input_stream_new (fd[0], TRUE));
     812    os = G_UNIX_OUTPUT_STREAM (g_unix_output_stream_new (fd[1], TRUE));
     813  
     814    g_output_stream_writev_all_async (G_OUTPUT_STREAM (os),
     815                                      vectors,
     816                                      G_N_ELEMENTS (vectors),
     817                                      G_PRIORITY_DEFAULT,
     818                                      NULL,
     819                                      (GAsyncReadyCallback) writev_async_wouldblock_cb,
     820                                      &bytes_written);
     821  
     822    g_input_stream_read_all_async (G_INPUT_STREAM (is),
     823                                   data_read,
     824                                   4 * pipe_capacity,
     825                                   G_PRIORITY_DEFAULT,
     826                                   NULL,
     827                                   (GAsyncReadyCallback) read_async_wouldblock_cb,
     828                                   &bytes_read);
     829  
     830    while (bytes_written == 0 && bytes_read == 0)
     831      g_main_context_iteration (NULL, TRUE);
     832  
     833    g_assert_cmpuint (bytes_written, ==, 4 * pipe_capacity);
     834    g_assert_cmpuint (bytes_read, ==, 4 * pipe_capacity);
     835    g_assert_cmpmem (data_read, bytes_read, data, bytes_written);
     836  
     837    g_free (data);
     838    g_free (data_read);
     839  
     840    g_object_unref (os);
     841    g_object_unref (is);
     842  #endif  /* F_GETPIPE_SZ */
     843  }
     844  
     845  int
     846  main (int   argc,
     847        char *argv[])
     848  {
     849    g_test_init (&argc, &argv, NULL);
     850  
     851    g_test_add_func ("/unix-streams/basic", test_basic);
     852    g_test_add_data_func ("/unix-streams/pipe-io-test",
     853  			GINT_TO_POINTER (FALSE),
     854  			test_pipe_io);
     855    g_test_add_data_func ("/unix-streams/nonblocking-io-test",
     856  			GINT_TO_POINTER (TRUE),
     857  			test_pipe_io);
     858  
     859    g_test_add_data_func ("/unix-streams/read_write",
     860                          GINT_TO_POINTER (FALSE),
     861                          test_read_write);
     862  
     863    g_test_add_data_func ("/unix-streams/read_writev",
     864                          GINT_TO_POINTER (TRUE),
     865                          test_read_write);
     866  
     867    g_test_add_func ("/unix-streams/write-wouldblock",
     868  		   test_write_wouldblock);
     869    g_test_add_func ("/unix-streams/writev-wouldblock",
     870  		   test_writev_wouldblock);
     871  
     872    g_test_add_func ("/unix-streams/write-async-wouldblock",
     873  		   test_write_async_wouldblock);
     874    g_test_add_func ("/unix-streams/writev-async-wouldblock",
     875  		   test_writev_async_wouldblock);
     876  
     877    return g_test_run();
     878  }