(root)/
glib-2.79.0/
gio/
ginputstream.c
       1  /* GIO - GLib Input, Output and Streaming Library
       2   * 
       3   * Copyright (C) 2006-2007 Red Hat, Inc.
       4   *
       5   * SPDX-License-Identifier: LGPL-2.1-or-later
       6   *
       7   * This library is free software; you can redistribute it and/or
       8   * modify it under the terms of the GNU Lesser General Public
       9   * License as published by the Free Software Foundation; either
      10   * version 2.1 of the License, or (at your option) any later version.
      11   *
      12   * This library is distributed in the hope that it will be useful,
      13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
      14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      15   * Lesser General Public License for more details.
      16   *
      17   * You should have received a copy of the GNU Lesser General
      18   * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
      19   *
      20   * Author: Alexander Larsson <alexl@redhat.com>
      21   */
      22  
      23  #include "config.h"
      24  #include <glib.h>
      25  #include "glibintl.h"
      26  
      27  #include "ginputstream.h"
      28  #include "gioprivate.h"
      29  #include "gseekable.h"
      30  #include "gcancellable.h"
      31  #include "gasyncresult.h"
      32  #include "gioerror.h"
      33  #include "gpollableinputstream.h"
      34  
      35  /**
      36   * GInputStream:
      37   *
      38   * `GInputStream` is a base class for implementing streaming input.
      39   *
      40   * It has functions to read from a stream ([method@Gio.InputStream.read]),
      41   * to close a stream ([method@Gio.InputStream.close]) and to skip some content
      42   * ([method@Gio.InputStream.skip]).
      43   *
      44   * To copy the content of an input stream to an output stream without 
      45   * manually handling the reads and writes, use [method@Gio.OutputStream.splice].
      46   *
      47   * See the documentation for [class@Gio.IOStream] for details of thread safety
      48   * of streaming APIs.
      49   *
      50   * All of these functions have async variants too.
      51   **/
      52  
      53  struct _GInputStreamPrivate {
      54    guint closed : 1;
      55    guint pending : 1;
      56    GAsyncReadyCallback outstanding_callback;
      57  };
      58  
      59  G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
      60  
      61  static gssize   g_input_stream_real_skip         (GInputStream         *stream,
      62  						  gsize                 count,
      63  						  GCancellable         *cancellable,
      64  						  GError              **error);
      65  static void     g_input_stream_real_read_async   (GInputStream         *stream,
      66  						  void                 *buffer,
      67  						  gsize                 count,
      68  						  int                   io_priority,
      69  						  GCancellable         *cancellable,
      70  						  GAsyncReadyCallback   callback,
      71  						  gpointer              user_data);
      72  static gssize   g_input_stream_real_read_finish  (GInputStream         *stream,
      73  						  GAsyncResult         *result,
      74  						  GError              **error);
      75  static void     g_input_stream_real_skip_async   (GInputStream         *stream,
      76  						  gsize                 count,
      77  						  int                   io_priority,
      78  						  GCancellable         *cancellable,
      79  						  GAsyncReadyCallback   callback,
      80  						  gpointer              data);
      81  static gssize   g_input_stream_real_skip_finish  (GInputStream         *stream,
      82  						  GAsyncResult         *result,
      83  						  GError              **error);
      84  static void     g_input_stream_real_close_async  (GInputStream         *stream,
      85  						  int                   io_priority,
      86  						  GCancellable         *cancellable,
      87  						  GAsyncReadyCallback   callback,
      88  						  gpointer              data);
      89  static gboolean g_input_stream_real_close_finish (GInputStream         *stream,
      90  						  GAsyncResult         *result,
      91  						  GError              **error);
      92  
      93  static void
      94  g_input_stream_dispose (GObject *object)
      95  {
      96    GInputStream *stream;
      97  
      98    stream = G_INPUT_STREAM (object);
      99    
     100    if (!stream->priv->closed)
     101      g_input_stream_close (stream, NULL, NULL);
     102  
     103    G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
     104  }
     105  
     106  
     107  static void
     108  g_input_stream_class_init (GInputStreamClass *klass)
     109  {
     110    GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
     111    
     112    gobject_class->dispose = g_input_stream_dispose;
     113    
     114    klass->skip = g_input_stream_real_skip;
     115    klass->read_async = g_input_stream_real_read_async;
     116    klass->read_finish = g_input_stream_real_read_finish;
     117    klass->skip_async = g_input_stream_real_skip_async;
     118    klass->skip_finish = g_input_stream_real_skip_finish;
     119    klass->close_async = g_input_stream_real_close_async;
     120    klass->close_finish = g_input_stream_real_close_finish;
     121  }
     122  
     123  static void
     124  g_input_stream_init (GInputStream *stream)
     125  {
     126    stream->priv = g_input_stream_get_instance_private (stream);
     127  }
     128  
     129  /**
     130   * g_input_stream_read:
     131   * @stream: a #GInputStream.
     132   * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
     133   *     a buffer to read data into (which should be at least count bytes long).
     134   * @count: (in): the number of bytes that will be read from the stream
     135   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     136   * @error: location to store the error occurring, or %NULL to ignore
     137   *
     138   * Tries to read @count bytes from the stream into the buffer starting at
     139   * @buffer. Will block during this read.
     140   * 
     141   * If count is zero returns zero and does nothing. A value of @count
     142   * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
     143   *
     144   * On success, the number of bytes read into the buffer is returned.
     145   * It is not an error if this is not the same as the requested size, as it
     146   * can happen e.g. near the end of a file. Zero is returned on end of file
     147   * (or if @count is zero),  but never otherwise.
     148   *
     149   * The returned @buffer is not a nul-terminated string, it can contain nul bytes
     150   * at any position, and this function doesn't nul-terminate the @buffer.
     151   *
     152   * If @cancellable is not %NULL, then the operation can be cancelled by
     153   * triggering the cancellable object from another thread. If the operation
     154   * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
     155   * operation was partially finished when the operation was cancelled the
     156   * partial result will be returned, without an error.
     157   *
     158   * On error -1 is returned and @error is set accordingly.
     159   * 
     160   * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
     161   **/
     162  gssize
     163  g_input_stream_read  (GInputStream  *stream,
     164  		      void          *buffer,
     165  		      gsize          count,
     166  		      GCancellable  *cancellable,
     167  		      GError       **error)
     168  {
     169    GInputStreamClass *class;
     170    gssize res;
     171  
     172    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
     173    g_return_val_if_fail (buffer != NULL, 0);
     174  
     175    if (count == 0)
     176      return 0;
     177    
     178    if (((gssize) count) < 0)
     179      {
     180        g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
     181  		   _("Too large count value passed to %s"), G_STRFUNC);
     182        return -1;
     183      }
     184  
     185    class = G_INPUT_STREAM_GET_CLASS (stream);
     186  
     187    if (class->read_fn == NULL) 
     188      {
     189        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
     190                             _("Input stream doesn’t implement read"));
     191        return -1;
     192      }
     193  
     194    if (!g_input_stream_set_pending (stream, error))
     195      return -1;
     196  
     197    if (cancellable)
     198      g_cancellable_push_current (cancellable);
     199    
     200    res = class->read_fn (stream, buffer, count, cancellable, error);
     201  
     202    if (cancellable)
     203      g_cancellable_pop_current (cancellable);
     204    
     205    g_input_stream_clear_pending (stream);
     206  
     207    return res;
     208  }
     209  
     210  /**
     211   * g_input_stream_read_all:
     212   * @stream: a #GInputStream.
     213   * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
     214   *     a buffer to read data into (which should be at least count bytes long).
     215   * @count: (in): the number of bytes that will be read from the stream
     216   * @bytes_read: (out): location to store the number of bytes that was read from the stream
     217   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     218   * @error: location to store the error occurring, or %NULL to ignore
     219   *
     220   * Tries to read @count bytes from the stream into the buffer starting at
     221   * @buffer. Will block during this read.
     222   *
     223   * This function is similar to g_input_stream_read(), except it tries to
     224   * read as many bytes as requested, only stopping on an error or end of stream.
     225   *
     226   * On a successful read of @count bytes, or if we reached the end of the
     227   * stream,  %TRUE is returned, and @bytes_read is set to the number of bytes
     228   * read into @buffer.
     229   * 
     230   * If there is an error during the operation %FALSE is returned and @error
     231   * is set to indicate the error status.
     232   *
     233   * As a special exception to the normal conventions for functions that
     234   * use #GError, if this function returns %FALSE (and sets @error) then
     235   * @bytes_read will be set to the number of bytes that were successfully
     236   * read before the error was encountered.  This functionality is only
     237   * available from C.  If you need it from another language then you must
     238   * write your own loop around g_input_stream_read().
     239   *
     240   * Returns: %TRUE on success, %FALSE if there was an error
     241   **/
     242  gboolean
     243  g_input_stream_read_all (GInputStream  *stream,
     244  			 void          *buffer,
     245  			 gsize          count,
     246  			 gsize         *bytes_read,
     247  			 GCancellable  *cancellable,
     248  			 GError       **error)
     249  {
     250    gsize _bytes_read;
     251    gssize res;
     252  
     253    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
     254    g_return_val_if_fail (buffer != NULL, FALSE);
     255  
     256    _bytes_read = 0;
     257    while (_bytes_read < count)
     258      {
     259        res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
     260  				 cancellable, error);
     261        if (res == -1)
     262  	{
     263  	  if (bytes_read)
     264  	    *bytes_read = _bytes_read;
     265  	  return FALSE;
     266  	}
     267        
     268        if (res == 0)
     269  	break;
     270  
     271        _bytes_read += res;
     272      }
     273  
     274    if (bytes_read)
     275      *bytes_read = _bytes_read;
     276    return TRUE;
     277  }
     278  
     279  /**
     280   * g_input_stream_read_bytes:
     281   * @stream: a #GInputStream.
     282   * @count: maximum number of bytes that will be read from the stream. Common
     283   * values include 4096 and 8192.
     284   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     285   * @error: location to store the error occurring, or %NULL to ignore
     286   *
     287   * Like g_input_stream_read(), this tries to read @count bytes from
     288   * the stream in a blocking fashion. However, rather than reading into
     289   * a user-supplied buffer, this will create a new #GBytes containing
     290   * the data that was read. This may be easier to use from language
     291   * bindings.
     292   *
     293   * If count is zero, returns a zero-length #GBytes and does nothing. A
     294   * value of @count larger than %G_MAXSSIZE will cause a
     295   * %G_IO_ERROR_INVALID_ARGUMENT error.
     296   *
     297   * On success, a new #GBytes is returned. It is not an error if the
     298   * size of this object is not the same as the requested size, as it
     299   * can happen e.g. near the end of a file. A zero-length #GBytes is
     300   * returned on end of file (or if @count is zero), but never
     301   * otherwise.
     302   *
     303   * If @cancellable is not %NULL, then the operation can be cancelled by
     304   * triggering the cancellable object from another thread. If the operation
     305   * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
     306   * operation was partially finished when the operation was cancelled the
     307   * partial result will be returned, without an error.
     308   *
     309   * On error %NULL is returned and @error is set accordingly.
     310   *
     311   * Returns: (transfer full): a new #GBytes, or %NULL on error
     312   *
     313   * Since: 2.34
     314   **/
     315  GBytes *
     316  g_input_stream_read_bytes (GInputStream  *stream,
     317  			   gsize          count,
     318  			   GCancellable  *cancellable,
     319  			   GError       **error)
     320  {
     321    guchar *buf;
     322    gssize nread;
     323  
     324    buf = g_malloc (count);
     325    nread = g_input_stream_read (stream, buf, count, cancellable, error);
     326    if (nread == -1)
     327      {
     328        g_free (buf);
     329        return NULL;
     330      }
     331    else if (nread == 0)
     332      {
     333        g_free (buf);
     334        return g_bytes_new_static ("", 0);
     335      }
     336    else
     337      return g_bytes_new_take (buf, nread);
     338  }
     339  
     340  /**
     341   * g_input_stream_skip:
     342   * @stream: a #GInputStream.
     343   * @count: the number of bytes that will be skipped from the stream
     344   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
     345   * @error: location to store the error occurring, or %NULL to ignore
     346   *
     347   * Tries to skip @count bytes from the stream. Will block during the operation.
     348   *
     349   * This is identical to g_input_stream_read(), from a behaviour standpoint,
     350   * but the bytes that are skipped are not returned to the user. Some
     351   * streams have an implementation that is more efficient than reading the data.
     352   *
     353   * This function is optional for inherited classes, as the default implementation
     354   * emulates it using read.
     355   *
     356   * If @cancellable is not %NULL, then the operation can be cancelled by
     357   * triggering the cancellable object from another thread. If the operation
     358   * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
     359   * operation was partially finished when the operation was cancelled the
     360   * partial result will be returned, without an error.
     361   *
     362   * Returns: Number of bytes skipped, or -1 on error
     363   **/
     364  gssize
     365  g_input_stream_skip (GInputStream  *stream,
     366  		     gsize          count,
     367  		     GCancellable  *cancellable,
     368  		     GError       **error)
     369  {
     370    GInputStreamClass *class;
     371    gssize res;
     372  
     373    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
     374  
     375    if (count == 0)
     376      return 0;
     377  
     378    if (((gssize) count) < 0)
     379      {
     380        g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
     381  		   _("Too large count value passed to %s"), G_STRFUNC);
     382        return -1;
     383      }
     384    
     385    class = G_INPUT_STREAM_GET_CLASS (stream);
     386  
     387    if (!g_input_stream_set_pending (stream, error))
     388      return -1;
     389  
     390    if (cancellable)
     391      g_cancellable_push_current (cancellable);
     392    
     393    res = class->skip (stream, count, cancellable, error);
     394  
     395    if (cancellable)
     396      g_cancellable_pop_current (cancellable);
     397    
     398    g_input_stream_clear_pending (stream);
     399  
     400    return res;
     401  }
     402  
     403  static gssize
     404  g_input_stream_real_skip (GInputStream  *stream,
     405  			  gsize          count,
     406  			  GCancellable  *cancellable,
     407  			  GError       **error)
     408  {
     409    GInputStreamClass *class;
     410    gssize ret, read_bytes;
     411    char buffer[8192];
     412    GError *my_error;
     413  
     414    if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
     415      {
     416        GSeekable *seekable = G_SEEKABLE (stream);
     417        goffset start, end;
     418        gboolean success;
     419  
     420        /* g_seekable_seek() may try to set pending itself */
     421        stream->priv->pending = FALSE;
     422  
     423        start = g_seekable_tell (seekable);
     424  
     425        if (g_seekable_seek (G_SEEKABLE (stream),
     426                             0,
     427                             G_SEEK_END,
     428                             cancellable,
     429                             NULL))
     430          {
     431            end = g_seekable_tell (seekable);
     432            g_assert (start >= 0);
     433            g_assert (end >= start);
     434            if (start > (goffset) (G_MAXOFFSET - count) ||
     435                (goffset) (start + count) > end)
     436              {
     437                stream->priv->pending = TRUE;
     438                return end - start;
     439              }
     440  
     441            success = g_seekable_seek (G_SEEKABLE (stream),
     442                                       start + count,
     443                                       G_SEEK_SET,
     444                                       cancellable,
     445                                       error);
     446            stream->priv->pending = TRUE;
     447  
     448            if (success)
     449              return count;
     450            else
     451              return -1;
     452          }
     453      }
     454  
     455    /* If not seekable, or seek failed, fall back to reading data: */
     456  
     457    class = G_INPUT_STREAM_GET_CLASS (stream);
     458  
     459    read_bytes = 0;
     460    while (1)
     461      {
     462        my_error = NULL;
     463  
     464        ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
     465                              cancellable, &my_error);
     466        if (ret == -1)
     467  	{
     468  	  if (read_bytes > 0 &&
     469  	      my_error->domain == G_IO_ERROR &&
     470  	      my_error->code == G_IO_ERROR_CANCELLED)
     471  	    {
     472  	      g_error_free (my_error);
     473  	      return read_bytes;
     474  	    }
     475  
     476  	  g_propagate_error (error, my_error);
     477  	  return -1;
     478  	}
     479  
     480        count -= ret;
     481        read_bytes += ret;
     482  
     483        if (ret == 0 || count == 0)
     484          return read_bytes;
     485      }
     486  }
     487  
     488  /**
     489   * g_input_stream_close:
     490   * @stream: A #GInputStream.
     491   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     492   * @error: location to store the error occurring, or %NULL to ignore
     493   *
     494   * Closes the stream, releasing resources related to it.
     495   *
     496   * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
     497   * Closing a stream multiple times will not return an error.
     498   *
     499   * Streams will be automatically closed when the last reference
     500   * is dropped, but you might want to call this function to make sure 
     501   * resources are released as early as possible.
     502   *
     503   * Some streams might keep the backing store of the stream (e.g. a file descriptor)
     504   * open after the stream is closed. See the documentation for the individual
     505   * stream for details.
     506   *
     507   * On failure the first error that happened will be reported, but the close
     508   * operation will finish as much as possible. A stream that failed to
     509   * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
     510   * is important to check and report the error to the user.
     511   *
     512   * If @cancellable is not %NULL, then the operation can be cancelled by
     513   * triggering the cancellable object from another thread. If the operation
     514   * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
     515   * Cancelling a close will still leave the stream closed, but some streams
     516   * can use a faster close that doesn't block to e.g. check errors. 
     517   *
     518   * Returns: %TRUE on success, %FALSE on failure
     519   **/
     520  gboolean
     521  g_input_stream_close (GInputStream  *stream,
     522  		      GCancellable  *cancellable,
     523  		      GError       **error)
     524  {
     525    GInputStreamClass *class;
     526    gboolean res;
     527  
     528    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
     529  
     530    class = G_INPUT_STREAM_GET_CLASS (stream);
     531  
     532    if (stream->priv->closed)
     533      return TRUE;
     534  
     535    res = TRUE;
     536  
     537    if (!g_input_stream_set_pending (stream, error))
     538      return FALSE;
     539  
     540    if (cancellable)
     541      g_cancellable_push_current (cancellable);
     542  
     543    if (class->close_fn)
     544      res = class->close_fn (stream, cancellable, error);
     545  
     546    if (cancellable)
     547      g_cancellable_pop_current (cancellable);
     548  
     549    g_input_stream_clear_pending (stream);
     550    
     551    stream->priv->closed = TRUE;
     552    
     553    return res;
     554  }
     555  
     556  static void
     557  async_ready_callback_wrapper (GObject      *source_object,
     558  			      GAsyncResult *res,
     559  			      gpointer      user_data)
     560  {
     561    GInputStream *stream = G_INPUT_STREAM (source_object);
     562  
     563    g_input_stream_clear_pending (stream);
     564    if (stream->priv->outstanding_callback)
     565      (*stream->priv->outstanding_callback) (source_object, res, user_data);
     566    g_object_unref (stream);
     567  }
     568  
     569  static void
     570  async_ready_close_callback_wrapper (GObject      *source_object,
     571  				    GAsyncResult *res,
     572  				    gpointer      user_data)
     573  {
     574    GInputStream *stream = G_INPUT_STREAM (source_object);
     575  
     576    g_input_stream_clear_pending (stream);
     577    stream->priv->closed = TRUE;
     578    if (stream->priv->outstanding_callback)
     579      (*stream->priv->outstanding_callback) (source_object, res, user_data);
     580    g_object_unref (stream);
     581  }
     582  
     583  /**
     584   * g_input_stream_read_async:
     585   * @stream: A #GInputStream.
     586   * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
     587   *     a buffer to read data into (which should be at least count bytes long).
     588   * @count: (in): the number of bytes that will be read from the stream
     589   * @io_priority: the [I/O priority][io-priority]
     590   * of the request. 
     591   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     592   * @callback: (scope async): a #GAsyncReadyCallback
     593   *   to call when the request is satisfied
     594   * @user_data: the data to pass to callback function
     595   *
     596   * Request an asynchronous read of @count bytes from the stream into the buffer
     597   * starting at @buffer. When the operation is finished @callback will be called. 
     598   * You can then call g_input_stream_read_finish() to get the result of the 
     599   * operation.
     600   *
     601   * During an async request no other sync and async calls are allowed on @stream, and will
     602   * result in %G_IO_ERROR_PENDING errors. 
     603   *
     604   * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
     605   *
     606   * On success, the number of bytes read into the buffer will be passed to the
     607   * callback. It is not an error if this is not the same as the requested size, as it
     608   * can happen e.g. near the end of a file, but generally we try to read
     609   * as many bytes as requested. Zero is returned on end of file
     610   * (or if @count is zero),  but never otherwise.
     611   *
     612   * Any outstanding i/o request with higher priority (lower numerical value) will
     613   * be executed before an outstanding request with lower priority. Default
     614   * priority is %G_PRIORITY_DEFAULT.
     615   *
     616   * The asynchronous methods have a default fallback that uses threads to implement
     617   * asynchronicity, so they are optional for inheriting classes. However, if you
     618   * override one you must override all.
     619   **/
     620  void
     621  g_input_stream_read_async (GInputStream        *stream,
     622  			   void                *buffer,
     623  			   gsize                count,
     624  			   int                  io_priority,
     625  			   GCancellable        *cancellable,
     626  			   GAsyncReadyCallback  callback,
     627  			   gpointer             user_data)
     628  {
     629    GInputStreamClass *class;
     630    GError *error = NULL;
     631  
     632    g_return_if_fail (G_IS_INPUT_STREAM (stream));
     633    g_return_if_fail (buffer != NULL);
     634  
     635    if (count == 0)
     636      {
     637        GTask *task;
     638  
     639        task = g_task_new (stream, cancellable, callback, user_data);
     640        g_task_set_source_tag (task, g_input_stream_read_async);
     641        g_task_return_int (task, 0);
     642        g_object_unref (task);
     643        return;
     644      }
     645    
     646    if (((gssize) count) < 0)
     647      {
     648        g_task_report_new_error (stream, callback, user_data,
     649                                 g_input_stream_read_async,
     650                                 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
     651                                 _("Too large count value passed to %s"),
     652                                 G_STRFUNC);
     653        return;
     654      }
     655  
     656    if (!g_input_stream_set_pending (stream, &error))
     657      {
     658        g_task_report_error (stream, callback, user_data,
     659                             g_input_stream_read_async,
     660                             error);
     661        return;
     662      }
     663  
     664    class = G_INPUT_STREAM_GET_CLASS (stream);
     665    stream->priv->outstanding_callback = callback;
     666    g_object_ref (stream);
     667    class->read_async (stream, buffer, count, io_priority, cancellable,
     668  		     async_ready_callback_wrapper, user_data);
     669  }
     670  
     671  /**
     672   * g_input_stream_read_finish:
     673   * @stream: a #GInputStream.
     674   * @result: a #GAsyncResult.
     675   * @error: a #GError location to store the error occurring, or %NULL to 
     676   * ignore.
     677   * 
     678   * Finishes an asynchronous stream read operation. 
     679   * 
     680   * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
     681   **/
     682  gssize
     683  g_input_stream_read_finish (GInputStream  *stream,
     684  			    GAsyncResult  *result,
     685  			    GError       **error)
     686  {
     687    GInputStreamClass *class;
     688    
     689    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
     690    g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
     691  
     692    if (g_async_result_legacy_propagate_error (result, error))
     693      return -1;
     694    else if (g_async_result_is_tagged (result, g_input_stream_read_async))
     695      return g_task_propagate_int (G_TASK (result), error);
     696  
     697    class = G_INPUT_STREAM_GET_CLASS (stream);
     698    return class->read_finish (stream, result, error);
     699  }
     700  
     701  typedef struct
     702  {
     703    gchar *buffer;
     704    gsize to_read;
     705    gsize bytes_read;
     706  } AsyncReadAll;
     707  
     708  static void
     709  free_async_read_all (gpointer data)
     710  {
     711    g_slice_free (AsyncReadAll, data);
     712  }
     713  
     714  static void
     715  read_all_callback (GObject      *stream,
     716                     GAsyncResult *result,
     717                     gpointer      user_data)
     718  {
     719    GTask *task = user_data;
     720    AsyncReadAll *data = g_task_get_task_data (task);
     721    gboolean got_eof = FALSE;
     722  
     723    if (result)
     724      {
     725        GError *error = NULL;
     726        gssize nread;
     727  
     728        nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
     729  
     730        if (nread == -1)
     731          {
     732            g_task_return_error (task, error);
     733            g_object_unref (task);
     734            return;
     735          }
     736  
     737        g_assert_cmpint (nread, <=, data->to_read);
     738        data->to_read -= nread;
     739        data->bytes_read += nread;
     740        got_eof = (nread == 0);
     741      }
     742  
     743    if (got_eof || data->to_read == 0)
     744      {
     745        g_task_return_boolean (task, TRUE);
     746        g_object_unref (task);
     747      }
     748  
     749    else
     750      g_input_stream_read_async (G_INPUT_STREAM (stream),
     751                                 data->buffer + data->bytes_read,
     752                                 data->to_read,
     753                                 g_task_get_priority (task),
     754                                 g_task_get_cancellable (task),
     755                                 read_all_callback, task);
     756  }
     757  
     758  
     759  static void
     760  read_all_async_thread (GTask        *task,
     761                         gpointer      source_object,
     762                         gpointer      task_data,
     763                         GCancellable *cancellable)
     764  {
     765    GInputStream *stream = source_object;
     766    AsyncReadAll *data = task_data;
     767    GError *error = NULL;
     768  
     769    if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
     770                                 g_task_get_cancellable (task), &error))
     771      g_task_return_boolean (task, TRUE);
     772    else
     773      g_task_return_error (task, error);
     774  }
     775  
     776  /**
     777   * g_input_stream_read_all_async:
     778   * @stream: A #GInputStream
     779   * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
     780   *     a buffer to read data into (which should be at least count bytes long)
     781   * @count: (in): the number of bytes that will be read from the stream
     782   * @io_priority: the [I/O priority][io-priority] of the request
     783   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
     784   * @callback: (scope async): a #GAsyncReadyCallback
     785   *   to call when the request is satisfied
     786   * @user_data: the data to pass to callback function
     787   *
     788   * Request an asynchronous read of @count bytes from the stream into the
     789   * buffer starting at @buffer.
     790   *
     791   * This is the asynchronous equivalent of g_input_stream_read_all().
     792   *
     793   * Call g_input_stream_read_all_finish() to collect the result.
     794   *
     795   * Any outstanding I/O request with higher priority (lower numerical
     796   * value) will be executed before an outstanding request with lower
     797   * priority. Default priority is %G_PRIORITY_DEFAULT.
     798   *
     799   * Since: 2.44
     800   **/
     801  void
     802  g_input_stream_read_all_async (GInputStream        *stream,
     803                                 void                *buffer,
     804                                 gsize                count,
     805                                 int                  io_priority,
     806                                 GCancellable        *cancellable,
     807                                 GAsyncReadyCallback  callback,
     808                                 gpointer             user_data)
     809  {
     810    AsyncReadAll *data;
     811    GTask *task;
     812  
     813    g_return_if_fail (G_IS_INPUT_STREAM (stream));
     814    g_return_if_fail (buffer != NULL || count == 0);
     815  
     816    task = g_task_new (stream, cancellable, callback, user_data);
     817    data = g_slice_new0 (AsyncReadAll);
     818    data->buffer = buffer;
     819    data->to_read = count;
     820  
     821    g_task_set_source_tag (task, g_input_stream_read_all_async);
     822    g_task_set_task_data (task, data, free_async_read_all);
     823    g_task_set_priority (task, io_priority);
     824  
     825    /* If async reads are going to be handled via the threadpool anyway
     826     * then we may as well do it with a single dispatch instead of
     827     * bouncing in and out.
     828     */
     829    if (g_input_stream_async_read_is_via_threads (stream))
     830      {
     831        g_task_run_in_thread (task, read_all_async_thread);
     832        g_object_unref (task);
     833      }
     834    else
     835      read_all_callback (G_OBJECT (stream), NULL, task);
     836  }
     837  
     838  /**
     839   * g_input_stream_read_all_finish:
     840   * @stream: a #GInputStream
     841   * @result: a #GAsyncResult
     842   * @bytes_read: (out): location to store the number of bytes that was read from the stream
     843   * @error: a #GError location to store the error occurring, or %NULL to ignore
     844   *
     845   * Finishes an asynchronous stream read operation started with
     846   * g_input_stream_read_all_async().
     847   *
     848   * As a special exception to the normal conventions for functions that
     849   * use #GError, if this function returns %FALSE (and sets @error) then
     850   * @bytes_read will be set to the number of bytes that were successfully
     851   * read before the error was encountered.  This functionality is only
     852   * available from C.  If you need it from another language then you must
     853   * write your own loop around g_input_stream_read_async().
     854   *
     855   * Returns: %TRUE on success, %FALSE if there was an error
     856   *
     857   * Since: 2.44
     858   **/
     859  gboolean
     860  g_input_stream_read_all_finish (GInputStream  *stream,
     861                                  GAsyncResult  *result,
     862                                  gsize         *bytes_read,
     863                                  GError       **error)
     864  {
     865    GTask *task;
     866  
     867    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
     868    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
     869  
     870    task = G_TASK (result);
     871  
     872    if (bytes_read)
     873      {
     874        AsyncReadAll *data = g_task_get_task_data (task);
     875  
     876        *bytes_read = data->bytes_read;
     877      }
     878  
     879    return g_task_propagate_boolean (task, error);
     880  }
     881  
     882  static void
     883  read_bytes_callback (GObject      *stream,
     884  		     GAsyncResult *result,
     885  		     gpointer      user_data)
     886  {
     887    GTask *task = user_data;
     888    guchar *buf = g_task_get_task_data (task);
     889    GError *error = NULL;
     890    gssize nread;
     891    GBytes *bytes = NULL;
     892  
     893    nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
     894  				      result, &error);
     895    if (nread == -1)
     896      {
     897        g_free (buf);
     898        g_task_return_error (task, error);
     899      }
     900    else if (nread == 0)
     901      {
     902        g_free (buf);
     903        bytes = g_bytes_new_static ("", 0);
     904      }
     905    else
     906      bytes = g_bytes_new_take (buf, nread);
     907  
     908    if (bytes)
     909      g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
     910  
     911    g_object_unref (task);
     912  }
     913  
     914  /**
     915   * g_input_stream_read_bytes_async:
     916   * @stream: A #GInputStream.
     917   * @count: the number of bytes that will be read from the stream
     918   * @io_priority: the [I/O priority][io-priority] of the request
     919   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     920   * @callback: (scope async): a #GAsyncReadyCallback
     921   *   to call when the request is satisfied
     922   * @user_data: the data to pass to callback function
     923   *
     924   * Request an asynchronous read of @count bytes from the stream into a
     925   * new #GBytes. When the operation is finished @callback will be
     926   * called. You can then call g_input_stream_read_bytes_finish() to get the
     927   * result of the operation.
     928   *
     929   * During an async request no other sync and async calls are allowed
     930   * on @stream, and will result in %G_IO_ERROR_PENDING errors.
     931   *
     932   * A value of @count larger than %G_MAXSSIZE will cause a
     933   * %G_IO_ERROR_INVALID_ARGUMENT error.
     934   *
     935   * On success, the new #GBytes will be passed to the callback. It is
     936   * not an error if this is smaller than the requested size, as it can
     937   * happen e.g. near the end of a file, but generally we try to read as
     938   * many bytes as requested. Zero is returned on end of file (or if
     939   * @count is zero), but never otherwise.
     940   *
     941   * Any outstanding I/O request with higher priority (lower numerical
     942   * value) will be executed before an outstanding request with lower
     943   * priority. Default priority is %G_PRIORITY_DEFAULT.
     944   *
     945   * Since: 2.34
     946   **/
     947  void
     948  g_input_stream_read_bytes_async (GInputStream          *stream,
     949  				 gsize                  count,
     950  				 int                    io_priority,
     951  				 GCancellable          *cancellable,
     952  				 GAsyncReadyCallback    callback,
     953  				 gpointer               user_data)
     954  {
     955    GTask *task;
     956    guchar *buf;
     957  
     958    task = g_task_new (stream, cancellable, callback, user_data);
     959    g_task_set_source_tag (task, g_input_stream_read_bytes_async);
     960  
     961    buf = g_malloc (count);
     962    g_task_set_task_data (task, buf, NULL);
     963  
     964    g_input_stream_read_async (stream, buf, count,
     965                               io_priority, cancellable,
     966                               read_bytes_callback, task);
     967  }
     968  
     969  /**
     970   * g_input_stream_read_bytes_finish:
     971   * @stream: a #GInputStream.
     972   * @result: a #GAsyncResult.
     973   * @error: a #GError location to store the error occurring, or %NULL to
     974   *   ignore.
     975   *
     976   * Finishes an asynchronous stream read-into-#GBytes operation.
     977   *
     978   * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
     979   *
     980   * Since: 2.34
     981   **/
     982  GBytes *
     983  g_input_stream_read_bytes_finish (GInputStream  *stream,
     984  				  GAsyncResult  *result,
     985  				  GError       **error)
     986  {
     987    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
     988    g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
     989  
     990    return g_task_propagate_pointer (G_TASK (result), error);
     991  }
     992  
     993  /**
     994   * g_input_stream_skip_async:
     995   * @stream: A #GInputStream.
     996   * @count: the number of bytes that will be skipped from the stream
     997   * @io_priority: the [I/O priority][io-priority] of the request
     998   * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
     999   * @callback: (scope async): a #GAsyncReadyCallback
    1000   *   to call when the request is satisfied
    1001   * @user_data: the data to pass to callback function
    1002   *
    1003   * Request an asynchronous skip of @count bytes from the stream.
    1004   * When the operation is finished @callback will be called.
    1005   * You can then call g_input_stream_skip_finish() to get the result
    1006   * of the operation.
    1007   *
    1008   * During an async request no other sync and async calls are allowed,
    1009   * and will result in %G_IO_ERROR_PENDING errors.
    1010   *
    1011   * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
    1012   *
    1013   * On success, the number of bytes skipped will be passed to the callback.
    1014   * It is not an error if this is not the same as the requested size, as it
    1015   * can happen e.g. near the end of a file, but generally we try to skip
    1016   * as many bytes as requested. Zero is returned on end of file
    1017   * (or if @count is zero), but never otherwise.
    1018   *
    1019   * Any outstanding i/o request with higher priority (lower numerical value)
    1020   * will be executed before an outstanding request with lower priority.
    1021   * Default priority is %G_PRIORITY_DEFAULT.
    1022   *
    1023   * The asynchronous methods have a default fallback that uses threads to
    1024   * implement asynchronicity, so they are optional for inheriting classes.
    1025   * However, if you override one, you must override all.
    1026   **/
    1027  void
    1028  g_input_stream_skip_async (GInputStream        *stream,
    1029  			   gsize                count,
    1030  			   int                  io_priority,
    1031  			   GCancellable        *cancellable,
    1032  			   GAsyncReadyCallback  callback,
    1033  			   gpointer             user_data)
    1034  {
    1035    GInputStreamClass *class;
    1036    GError *error = NULL;
    1037  
    1038    g_return_if_fail (G_IS_INPUT_STREAM (stream));
    1039  
    1040    if (count == 0)
    1041      {
    1042        GTask *task;
    1043  
    1044        task = g_task_new (stream, cancellable, callback, user_data);
    1045        g_task_set_source_tag (task, g_input_stream_skip_async);
    1046        g_task_return_int (task, 0);
    1047        g_object_unref (task);
    1048        return;
    1049      }
    1050    
    1051    if (((gssize) count) < 0)
    1052      {
    1053        g_task_report_new_error (stream, callback, user_data,
    1054                                 g_input_stream_skip_async,
    1055                                 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
    1056                                 _("Too large count value passed to %s"),
    1057                                 G_STRFUNC);
    1058        return;
    1059      }
    1060  
    1061    if (!g_input_stream_set_pending (stream, &error))
    1062      {
    1063        g_task_report_error (stream, callback, user_data,
    1064                             g_input_stream_skip_async,
    1065                             error);
    1066        return;
    1067      }
    1068  
    1069    class = G_INPUT_STREAM_GET_CLASS (stream);
    1070    stream->priv->outstanding_callback = callback;
    1071    g_object_ref (stream);
    1072    class->skip_async (stream, count, io_priority, cancellable,
    1073  		     async_ready_callback_wrapper, user_data);
    1074  }
    1075  
    1076  /**
    1077   * g_input_stream_skip_finish:
    1078   * @stream: a #GInputStream.
    1079   * @result: a #GAsyncResult.
    1080   * @error: a #GError location to store the error occurring, or %NULL to 
    1081   * ignore.
    1082   * 
    1083   * Finishes a stream skip operation.
    1084   * 
    1085   * Returns: the size of the bytes skipped, or `-1` on error.
    1086   **/
    1087  gssize
    1088  g_input_stream_skip_finish (GInputStream  *stream,
    1089  			    GAsyncResult  *result,
    1090  			    GError       **error)
    1091  {
    1092    GInputStreamClass *class;
    1093  
    1094    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
    1095    g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
    1096  
    1097    if (g_async_result_legacy_propagate_error (result, error))
    1098      return -1;
    1099    else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
    1100      return g_task_propagate_int (G_TASK (result), error);
    1101  
    1102    class = G_INPUT_STREAM_GET_CLASS (stream);
    1103    return class->skip_finish (stream, result, error);
    1104  }
    1105  
    1106  /**
    1107   * g_input_stream_close_async:
    1108   * @stream: A #GInputStream.
    1109   * @io_priority: the [I/O priority][io-priority] of the request
    1110   * @cancellable: (nullable): optional cancellable object
    1111   * @callback: (scope async): a #GAsyncReadyCallback
    1112   *   to call when the request is satisfied
    1113   * @user_data: the data to pass to callback function
    1114   *
    1115   * Requests an asynchronous closes of the stream, releasing resources related to it.
    1116   * When the operation is finished @callback will be called. 
    1117   * You can then call g_input_stream_close_finish() to get the result of the 
    1118   * operation.
    1119   *
    1120   * For behaviour details see g_input_stream_close().
    1121   *
    1122   * The asynchronous methods have a default fallback that uses threads to implement
    1123   * asynchronicity, so they are optional for inheriting classes. However, if you
    1124   * override one you must override all.
    1125   **/
    1126  void
    1127  g_input_stream_close_async (GInputStream        *stream,
    1128  			    int                  io_priority,
    1129  			    GCancellable        *cancellable,
    1130  			    GAsyncReadyCallback  callback,
    1131  			    gpointer             user_data)
    1132  {
    1133    GInputStreamClass *class;
    1134    GError *error = NULL;
    1135  
    1136    g_return_if_fail (G_IS_INPUT_STREAM (stream));
    1137  
    1138    if (stream->priv->closed)
    1139      {
    1140        GTask *task;
    1141  
    1142        task = g_task_new (stream, cancellable, callback, user_data);
    1143        g_task_set_source_tag (task, g_input_stream_close_async);
    1144        g_task_return_boolean (task, TRUE);
    1145        g_object_unref (task);
    1146        return;
    1147      }
    1148  
    1149    if (!g_input_stream_set_pending (stream, &error))
    1150      {
    1151        g_task_report_error (stream, callback, user_data,
    1152                             g_input_stream_close_async,
    1153                             error);
    1154        return;
    1155      }
    1156    
    1157    class = G_INPUT_STREAM_GET_CLASS (stream);
    1158    stream->priv->outstanding_callback = callback;
    1159    g_object_ref (stream);
    1160    class->close_async (stream, io_priority, cancellable,
    1161  		      async_ready_close_callback_wrapper, user_data);
    1162  }
    1163  
    1164  /**
    1165   * g_input_stream_close_finish:
    1166   * @stream: a #GInputStream.
    1167   * @result: a #GAsyncResult.
    1168   * @error: a #GError location to store the error occurring, or %NULL to 
    1169   * ignore.
    1170   * 
    1171   * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
    1172   * 
    1173   * Returns: %TRUE if the stream was closed successfully.
    1174   **/
    1175  gboolean
    1176  g_input_stream_close_finish (GInputStream  *stream,
    1177  			     GAsyncResult  *result,
    1178  			     GError       **error)
    1179  {
    1180    GInputStreamClass *class;
    1181  
    1182    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
    1183    g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
    1184  
    1185    if (g_async_result_legacy_propagate_error (result, error))
    1186      return FALSE;
    1187    else if (g_async_result_is_tagged (result, g_input_stream_close_async))
    1188      return g_task_propagate_boolean (G_TASK (result), error);
    1189  
    1190    class = G_INPUT_STREAM_GET_CLASS (stream);
    1191    return class->close_finish (stream, result, error);
    1192  }
    1193  
    1194  /**
    1195   * g_input_stream_is_closed:
    1196   * @stream: input stream.
    1197   * 
    1198   * Checks if an input stream is closed.
    1199   * 
    1200   * Returns: %TRUE if the stream is closed.
    1201   **/
    1202  gboolean
    1203  g_input_stream_is_closed (GInputStream *stream)
    1204  {
    1205    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
    1206    
    1207    return stream->priv->closed;
    1208  }
    1209   
    1210  /**
    1211   * g_input_stream_has_pending:
    1212   * @stream: input stream.
    1213   * 
    1214   * Checks if an input stream has pending actions.
    1215   * 
    1216   * Returns: %TRUE if @stream has pending actions.
    1217   **/  
    1218  gboolean
    1219  g_input_stream_has_pending (GInputStream *stream)
    1220  {
    1221    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
    1222    
    1223    return stream->priv->pending;
    1224  }
    1225  
    1226  /**
    1227   * g_input_stream_set_pending:
    1228   * @stream: input stream
    1229   * @error: a #GError location to store the error occurring, or %NULL to 
    1230   * ignore.
    1231   * 
    1232   * Sets @stream to have actions pending. If the pending flag is
    1233   * already set or @stream is closed, it will return %FALSE and set
    1234   * @error.
    1235   *
    1236   * Returns: %TRUE if pending was previously unset and is now set.
    1237   **/
    1238  gboolean
    1239  g_input_stream_set_pending (GInputStream *stream, GError **error)
    1240  {
    1241    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
    1242    
    1243    if (stream->priv->closed)
    1244      {
    1245        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
    1246                             _("Stream is already closed"));
    1247        return FALSE;
    1248      }
    1249    
    1250    if (stream->priv->pending)
    1251      {
    1252        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
    1253  		/* Translators: This is an error you get if there is already an
    1254  		 * operation running against this stream when you try to start
    1255  		 * one */
    1256  		 _("Stream has outstanding operation"));
    1257        return FALSE;
    1258      }
    1259    
    1260    stream->priv->pending = TRUE;
    1261    return TRUE;
    1262  }
    1263  
    1264  /**
    1265   * g_input_stream_clear_pending:
    1266   * @stream: input stream
    1267   * 
    1268   * Clears the pending flag on @stream.
    1269   **/
    1270  void
    1271  g_input_stream_clear_pending (GInputStream *stream)
    1272  {
    1273    g_return_if_fail (G_IS_INPUT_STREAM (stream));
    1274    
    1275    stream->priv->pending = FALSE;
    1276  }
    1277  
    1278  /*< internal >
    1279   * g_input_stream_async_read_is_via_threads:
    1280   * @stream: input stream
    1281   *
    1282   * Checks if an input stream's read_async function uses threads.
    1283   *
    1284   * Returns: %TRUE if @stream's read_async function uses threads.
    1285   **/
    1286  gboolean
    1287  g_input_stream_async_read_is_via_threads (GInputStream *stream)
    1288  {
    1289    GInputStreamClass *class;
    1290  
    1291    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
    1292  
    1293    class = G_INPUT_STREAM_GET_CLASS (stream);
    1294  
    1295    return (class->read_async == g_input_stream_real_read_async &&
    1296        !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
    1297          g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
    1298  }
    1299  
    1300  /*< internal >
    1301   * g_input_stream_async_close_is_via_threads:
    1302   * @stream: input stream
    1303   *
    1304   * Checks if an input stream's close_async function uses threads.
    1305   *
    1306   * Returns: %TRUE if @stream's close_async function uses threads.
    1307   **/
    1308  gboolean
    1309  g_input_stream_async_close_is_via_threads (GInputStream *stream)
    1310  {
    1311    GInputStreamClass *class;
    1312  
    1313    g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
    1314  
    1315    class = G_INPUT_STREAM_GET_CLASS (stream);
    1316  
    1317    return class->close_async == g_input_stream_real_close_async;
    1318  }
    1319  
    1320  /********************************************
    1321   *   Default implementation of async ops    *
    1322   ********************************************/
    1323  
    1324  typedef struct {
    1325    void   *buffer;
    1326    gsize   count;
    1327  } ReadData;
    1328  
    1329  static void
    1330  free_read_data (ReadData *op)
    1331  {
    1332    g_slice_free (ReadData, op);
    1333  }
    1334  
    1335  static void
    1336  read_async_thread (GTask        *task,
    1337                     gpointer      source_object,
    1338                     gpointer      task_data,
    1339                     GCancellable *cancellable)
    1340  {
    1341    GInputStream *stream = source_object;
    1342    ReadData *op = task_data;
    1343    GInputStreamClass *class;
    1344    GError *error = NULL;
    1345    gssize nread;
    1346   
    1347    class = G_INPUT_STREAM_GET_CLASS (stream);
    1348  
    1349    nread = class->read_fn (stream,
    1350                            op->buffer, op->count,
    1351                            g_task_get_cancellable (task),
    1352                            &error);
    1353    if (nread == -1)
    1354      g_task_return_error (task, error);
    1355    else
    1356      g_task_return_int (task, nread);
    1357  }
    1358  
    1359  static void read_async_pollable (GPollableInputStream *stream,
    1360                                   GTask                *task);
    1361  
    1362  static gboolean
    1363  read_async_pollable_ready (GPollableInputStream *stream,
    1364  			   gpointer              user_data)
    1365  {
    1366    GTask *task = user_data;
    1367  
    1368    read_async_pollable (stream, task);
    1369    return FALSE;
    1370  }
    1371  
    1372  static void
    1373  read_async_pollable (GPollableInputStream *stream,
    1374                       GTask                *task)
    1375  {
    1376    ReadData *op = g_task_get_task_data (task);
    1377    GError *error = NULL;
    1378    gssize nread;
    1379  
    1380    if (g_task_return_error_if_cancelled (task))
    1381      return;
    1382  
    1383    nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
    1384      read_nonblocking (stream, op->buffer, op->count, &error);
    1385  
    1386    if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
    1387      {
    1388        GSource *source;
    1389  
    1390        g_error_free (error);
    1391  
    1392        source = g_pollable_input_stream_create_source (stream,
    1393                                                        g_task_get_cancellable (task));
    1394        g_task_attach_source (task, source,
    1395                              (GSourceFunc) read_async_pollable_ready);
    1396        g_source_unref (source);
    1397        return;
    1398      }
    1399  
    1400    if (nread == -1)
    1401      g_task_return_error (task, error);
    1402    else
    1403      g_task_return_int (task, nread);
    1404    /* g_input_stream_real_read_async() unrefs task */
    1405  }
    1406  
    1407  
    1408  static void
    1409  g_input_stream_real_read_async (GInputStream        *stream,
    1410  				void                *buffer,
    1411  				gsize                count,
    1412  				int                  io_priority,
    1413  				GCancellable        *cancellable,
    1414  				GAsyncReadyCallback  callback,
    1415  				gpointer             user_data)
    1416  {
    1417    GTask *task;
    1418    ReadData *op;
    1419    
    1420    op = g_slice_new0 (ReadData);
    1421    task = g_task_new (stream, cancellable, callback, user_data);
    1422    g_task_set_source_tag (task, g_input_stream_real_read_async);
    1423    g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
    1424    g_task_set_priority (task, io_priority);
    1425    op->buffer = buffer;
    1426    op->count = count;
    1427  
    1428    if (!g_input_stream_async_read_is_via_threads (stream))
    1429      read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
    1430    else
    1431      g_task_run_in_thread (task, read_async_thread);
    1432    g_object_unref (task);
    1433  }
    1434  
    1435  static gssize
    1436  g_input_stream_real_read_finish (GInputStream  *stream,
    1437  				 GAsyncResult  *result,
    1438  				 GError       **error)
    1439  {
    1440    g_return_val_if_fail (g_task_is_valid (result, stream), -1);
    1441  
    1442    return g_task_propagate_int (G_TASK (result), error);
    1443  }
    1444  
    1445  
    1446  static void
    1447  skip_async_thread (GTask        *task,
    1448                     gpointer      source_object,
    1449                     gpointer      task_data,
    1450                     GCancellable *cancellable)
    1451  {
    1452    GInputStream *stream = source_object;
    1453    gsize count = GPOINTER_TO_SIZE (task_data);
    1454    GInputStreamClass *class;
    1455    GError *error = NULL;
    1456    gssize ret;
    1457  
    1458    class = G_INPUT_STREAM_GET_CLASS (stream);
    1459    ret = class->skip (stream, count,
    1460                       g_task_get_cancellable (task),
    1461                       &error);
    1462    if (ret == -1)
    1463      g_task_return_error (task, error);
    1464    else
    1465      g_task_return_int (task, ret);
    1466  }
    1467  
    1468  typedef struct {
    1469    char buffer[8192];
    1470    gsize count;
    1471    gsize count_skipped;
    1472  } SkipFallbackAsyncData;
    1473  
    1474  static void
    1475  skip_callback_wrapper (GObject      *source_object,
    1476  		       GAsyncResult *res,
    1477  		       gpointer      user_data)
    1478  {
    1479    GInputStreamClass *class;
    1480    GTask *task = user_data;
    1481    SkipFallbackAsyncData *data = g_task_get_task_data (task);
    1482    GError *error = NULL;
    1483    gssize ret;
    1484  
    1485    ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
    1486  
    1487    if (ret > 0)
    1488      {
    1489        data->count -= ret;
    1490        data->count_skipped += ret;
    1491  
    1492        if (data->count > 0)
    1493  	{
    1494  	  class = G_INPUT_STREAM_GET_CLASS (source_object);
    1495  	  class->read_async (G_INPUT_STREAM (source_object),
    1496                               data->buffer, MIN (8192, data->count),
    1497                               g_task_get_priority (task),
    1498                               g_task_get_cancellable (task),
    1499                               skip_callback_wrapper, task);
    1500  	  return;
    1501  	}
    1502      }
    1503  
    1504    if (ret == -1 &&
    1505        g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
    1506        data->count_skipped)
    1507      {
    1508        /* No error, return partial read */
    1509        g_clear_error (&error);
    1510      }
    1511  
    1512    if (error)
    1513      g_task_return_error (task, error);
    1514    else
    1515      g_task_return_int (task, data->count_skipped);
    1516    g_object_unref (task);
    1517   }
    1518  
    1519  static void
    1520  g_input_stream_real_skip_async (GInputStream        *stream,
    1521  				gsize                count,
    1522  				int                  io_priority,
    1523  				GCancellable        *cancellable,
    1524  				GAsyncReadyCallback  callback,
    1525  				gpointer             user_data)
    1526  {
    1527    GInputStreamClass *class;
    1528    SkipFallbackAsyncData *data;
    1529    GTask *task;
    1530  
    1531    class = G_INPUT_STREAM_GET_CLASS (stream);
    1532  
    1533    task = g_task_new (stream, cancellable, callback, user_data);
    1534    g_task_set_source_tag (task, g_input_stream_real_skip_async);
    1535    g_task_set_priority (task, io_priority);
    1536  
    1537    if (g_input_stream_async_read_is_via_threads (stream))
    1538      {
    1539        /* Read is thread-using async fallback.
    1540         * Make skip use threads too, so that we can use a possible sync skip
    1541         * implementation. */
    1542        g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
    1543  
    1544        g_task_run_in_thread (task, skip_async_thread);
    1545        g_object_unref (task);
    1546      }
    1547    else
    1548      {
    1549        /* TODO: Skip fallback uses too much memory, should do multiple read calls */
    1550        
    1551        /* There is a custom async read function, lets use that. */
    1552        data = g_new (SkipFallbackAsyncData, 1);
    1553        data->count = count;
    1554        data->count_skipped = 0;
    1555        g_task_set_task_data (task, data, g_free);
    1556        g_task_set_check_cancellable (task, FALSE);
    1557        class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
    1558  			 skip_callback_wrapper, task);
    1559      }
    1560  
    1561  }
    1562  
    1563  static gssize
    1564  g_input_stream_real_skip_finish (GInputStream  *stream,
    1565  				 GAsyncResult  *result,
    1566  				 GError       **error)
    1567  {
    1568    g_return_val_if_fail (g_task_is_valid (result, stream), -1);
    1569  
    1570    return g_task_propagate_int (G_TASK (result), error);
    1571  }
    1572  
    1573  static void
    1574  close_async_thread (GTask        *task,
    1575                      gpointer      source_object,
    1576                      gpointer      task_data,
    1577                      GCancellable *cancellable)
    1578  {
    1579    GInputStream *stream = source_object;
    1580    GInputStreamClass *class;
    1581    GError *error = NULL;
    1582    gboolean result;
    1583  
    1584    class = G_INPUT_STREAM_GET_CLASS (stream);
    1585    if (class->close_fn)
    1586      {
    1587        result = class->close_fn (stream,
    1588                                  g_task_get_cancellable (task),
    1589                                  &error);
    1590        if (!result)
    1591          {
    1592            g_task_return_error (task, error);
    1593            return;
    1594          }
    1595      }
    1596  
    1597    g_task_return_boolean (task, TRUE);
    1598  }
    1599  
    1600  static void
    1601  g_input_stream_real_close_async (GInputStream        *stream,
    1602  				 int                  io_priority,
    1603  				 GCancellable        *cancellable,
    1604  				 GAsyncReadyCallback  callback,
    1605  				 gpointer             user_data)
    1606  {
    1607    GTask *task;
    1608  
    1609    task = g_task_new (stream, cancellable, callback, user_data);
    1610    g_task_set_source_tag (task, g_input_stream_real_close_async);
    1611    g_task_set_check_cancellable (task, FALSE);
    1612    g_task_set_priority (task, io_priority);
    1613    
    1614    g_task_run_in_thread (task, close_async_thread);
    1615    g_object_unref (task);
    1616  }
    1617  
    1618  static gboolean
    1619  g_input_stream_real_close_finish (GInputStream  *stream,
    1620  				  GAsyncResult  *result,
    1621  				  GError       **error)
    1622  {
    1623    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
    1624  
    1625    return g_task_propagate_boolean (G_TASK (result), error);
    1626  }