(root)/
glib-2.79.0/
gio/
gbufferedoutputstream.c
       1  /* GIO - GLib Input, Output and Streaming Library
       2   * 
       3   * Copyright (C) 2006-2007 Red Hat, Inc.
       4   *
       5   * SPDX-License-Identifier: LGPL-2.1-or-later
       6   *
       7   * This library is free software; you can redistribute it and/or
       8   * modify it under the terms of the GNU Lesser General Public
       9   * License as published by the Free Software Foundation; either
      10   * version 2.1 of the License, or (at your option) any later version.
      11   *
      12   * This library is distributed in the hope that it will be useful,
      13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
      14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      15   * Lesser General Public License for more details.
      16   *
      17   * You should have received a copy of the GNU Lesser General
      18   * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
      19   *
      20   * Author: Christian Kellner <gicmo@gnome.org> 
      21   */
      22  
      23  #include "config.h"
      24  #include "gbufferedoutputstream.h"
      25  #include "goutputstream.h"
      26  #include "gseekable.h"
      27  #include "gtask.h"
      28  #include "string.h"
      29  #include "gioerror.h"
      30  #include "glibintl.h"
      31  
      32  /**
      33   * GBufferedOutputStream:
      34   *
      35   * Buffered output stream implements [class@Gio.FilterOutputStream] and provides
      36   * for buffered writes.
      37   *
      38   * By default, `GBufferedOutputStream`'s buffer size is set at 4 kilobytes.
      39   *
      40   * To create a buffered output stream, use [ctor@Gio.BufferedOutputStream.new],
      41   * or [ctor@Gio.BufferedOutputStream.new_sized] to specify the buffer's size
      42   * at construction.
      43   *
      44   * To get the size of a buffer within a buffered input stream, use
      45   * [method@Gio.BufferedOutputStream.get_buffer_size]. To change the size of a
      46   * buffered output stream's buffer, use [method@Gio.BufferedOutputStream.set_buffer_size].
      47   * Note that the buffer's size cannot be reduced below the size of the data within the buffer.
      48   */
      49  
      50  #define DEFAULT_BUFFER_SIZE 4096
      51  
      52  struct _GBufferedOutputStreamPrivate {
      53    guint8 *buffer; 
      54    gsize   len;
      55    goffset pos;
      56    gboolean auto_grow;
      57  };
      58  
      59  enum {
      60    PROP_0,
      61    PROP_BUFSIZE,
      62    PROP_AUTO_GROW
      63  };
      64  
      65  static void     g_buffered_output_stream_set_property (GObject      *object,
      66                                                         guint         prop_id,
      67                                                         const GValue *value,
      68                                                         GParamSpec   *pspec);
      69  
      70  static void     g_buffered_output_stream_get_property (GObject    *object,
      71                                                         guint       prop_id,
      72                                                         GValue     *value,
      73                                                         GParamSpec *pspec);
      74  static void     g_buffered_output_stream_finalize     (GObject *object);
      75  
      76  
      77  static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
      78                                                         const void    *buffer,
      79                                                         gsize          count,
      80                                                         GCancellable  *cancellable,
      81                                                         GError       **error);
      82  static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
      83                                                         GCancellable  *cancellable,
      84                                                         GError          **error);
      85  static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
      86                                                         GCancellable   *cancellable,
      87                                                         GError        **error);
      88  
      89  static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
      90                                                         int                   io_priority,
      91                                                         GCancellable         *cancellable,
      92                                                         GAsyncReadyCallback   callback,
      93                                                         gpointer              data);
      94  static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
      95                                                         GAsyncResult         *result,
      96                                                         GError              **error);
      97  static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
      98                                                         int                   io_priority,
      99                                                         GCancellable         *cancellable,
     100                                                         GAsyncReadyCallback   callback,
     101                                                         gpointer              data);
     102  static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
     103                                                         GAsyncResult         *result,
     104                                                         GError              **error);
     105  
     106  static void     g_buffered_output_stream_seekable_iface_init (GSeekableIface  *iface);
     107  static goffset  g_buffered_output_stream_tell                (GSeekable       *seekable);
     108  static gboolean g_buffered_output_stream_can_seek            (GSeekable       *seekable);
     109  static gboolean g_buffered_output_stream_seek                (GSeekable       *seekable,
     110  							      goffset          offset,
     111  							      GSeekType        type,
     112  							      GCancellable    *cancellable,
     113  							      GError         **error);
     114  static gboolean g_buffered_output_stream_can_truncate        (GSeekable       *seekable);
     115  static gboolean g_buffered_output_stream_truncate            (GSeekable       *seekable,
     116  							      goffset          offset,
     117  							      GCancellable    *cancellable,
     118  							      GError         **error);
     119  
     120  G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
     121  			 g_buffered_output_stream,
     122  			 G_TYPE_FILTER_OUTPUT_STREAM,
     123                           G_ADD_PRIVATE (GBufferedOutputStream)
     124  			 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
     125  						g_buffered_output_stream_seekable_iface_init))
     126  
     127  
     128  static void
     129  g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
     130  {
     131    GObjectClass *object_class;
     132    GOutputStreamClass *ostream_class;
     133  
     134    object_class = G_OBJECT_CLASS (klass);
     135    object_class->get_property = g_buffered_output_stream_get_property;
     136    object_class->set_property = g_buffered_output_stream_set_property;
     137    object_class->finalize     = g_buffered_output_stream_finalize;
     138  
     139    ostream_class = G_OUTPUT_STREAM_CLASS (klass);
     140    ostream_class->write_fn = g_buffered_output_stream_write;
     141    ostream_class->flush = g_buffered_output_stream_flush;
     142    ostream_class->close_fn = g_buffered_output_stream_close;
     143    ostream_class->flush_async  = g_buffered_output_stream_flush_async;
     144    ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
     145    ostream_class->close_async  = g_buffered_output_stream_close_async;
     146    ostream_class->close_finish = g_buffered_output_stream_close_finish;
     147  
     148    /**
     149     * GBufferedOutputStream:buffer-size:
     150     *
     151     * The size of the backend buffer, in bytes.
     152     */
     153    g_object_class_install_property (object_class,
     154                                     PROP_BUFSIZE,
     155                                     g_param_spec_uint ("buffer-size", NULL, NULL,
     156                                                        1,
     157                                                        G_MAXUINT,
     158                                                        DEFAULT_BUFFER_SIZE,
     159                                                        G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
     160                                                        G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
     161  
     162    /**
     163     * GBufferedOutputStream:auto-grow:
     164     *
     165     * Whether the buffer should automatically grow.
     166     */
     167    g_object_class_install_property (object_class,
     168                                     PROP_AUTO_GROW,
     169                                     g_param_spec_boolean ("auto-grow", NULL, NULL,
     170                                                           FALSE,
     171                                                           G_PARAM_READWRITE|
     172                                                           G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
     173  
     174  }
     175  
     176  /**
     177   * g_buffered_output_stream_get_buffer_size:
     178   * @stream: a #GBufferedOutputStream.
     179   * 
     180   * Gets the size of the buffer in the @stream.
     181   * 
     182   * Returns: the current size of the buffer.
     183   **/
     184  gsize
     185  g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
     186  {
     187    g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
     188  
     189    return stream->priv->len;
     190  }
     191  
     192  /**
     193   * g_buffered_output_stream_set_buffer_size:
     194   * @stream: a #GBufferedOutputStream.
     195   * @size: a #gsize.
     196   *
     197   * Sets the size of the internal buffer to @size.
     198   **/    
     199  void
     200  g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
     201                                            gsize                  size)
     202  {
     203    GBufferedOutputStreamPrivate *priv;
     204    guint8 *buffer;
     205    
     206    g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
     207  
     208    priv = stream->priv;
     209    
     210    if (size == priv->len)
     211      return;
     212  
     213    if (priv->buffer)
     214      {
     215        size = (priv->pos > 0) ? MAX (size, (gsize) priv->pos) : size;
     216  
     217        buffer = g_malloc (size);
     218        memcpy (buffer, priv->buffer, priv->pos);
     219        g_free (priv->buffer);
     220        priv->buffer = buffer;
     221        priv->len = size;
     222        /* Keep old pos */
     223      }
     224    else
     225      {
     226        priv->buffer = g_malloc (size);
     227        priv->len = size;
     228        priv->pos = 0;
     229      }
     230  
     231    g_object_notify (G_OBJECT (stream), "buffer-size");
     232  }
     233  
     234  /**
     235   * g_buffered_output_stream_get_auto_grow:
     236   * @stream: a #GBufferedOutputStream.
     237   * 
     238   * Checks if the buffer automatically grows as data is added.
     239   * 
     240   * Returns: %TRUE if the @stream's buffer automatically grows,
     241   * %FALSE otherwise.
     242   **/  
     243  gboolean
     244  g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
     245  {
     246    g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
     247  
     248    return stream->priv->auto_grow;
     249  }
     250  
     251  /**
     252   * g_buffered_output_stream_set_auto_grow:
     253   * @stream: a #GBufferedOutputStream.
     254   * @auto_grow: a #gboolean.
     255   *
     256   * Sets whether or not the @stream's buffer should automatically grow.
     257   * If @auto_grow is true, then each write will just make the buffer
     258   * larger, and you must manually flush the buffer to actually write out
     259   * the data to the underlying stream.
     260   **/
     261  void
     262  g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
     263                                          gboolean               auto_grow)
     264  {
     265    GBufferedOutputStreamPrivate *priv;
     266    g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
     267    priv = stream->priv;
     268    auto_grow = auto_grow != FALSE;
     269    if (priv->auto_grow != auto_grow)
     270      {
     271        priv->auto_grow = auto_grow;
     272        g_object_notify (G_OBJECT (stream), "auto-grow");
     273      }
     274  }
     275  
     276  static void
     277  g_buffered_output_stream_set_property (GObject      *object,
     278                                         guint         prop_id,
     279                                         const GValue *value,
     280                                         GParamSpec   *pspec)
     281  {
     282    GBufferedOutputStream *stream;
     283  
     284    stream = G_BUFFERED_OUTPUT_STREAM (object);
     285  
     286    switch (prop_id) 
     287      {
     288      case PROP_BUFSIZE:
     289        g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
     290        break;    
     291  
     292      case PROP_AUTO_GROW:
     293        g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
     294        break;
     295  
     296      default:
     297        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     298        break;
     299      }
     300  
     301  }
     302  
     303  static void
     304  g_buffered_output_stream_get_property (GObject    *object,
     305                                         guint       prop_id,
     306                                         GValue     *value,
     307                                         GParamSpec *pspec)
     308  {
     309    GBufferedOutputStream *buffered_stream;
     310    GBufferedOutputStreamPrivate *priv;
     311  
     312    buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
     313    priv = buffered_stream->priv;
     314  
     315    switch (prop_id)
     316      {
     317      case PROP_BUFSIZE:
     318        g_value_set_uint (value, priv->len);
     319        break;
     320  
     321      case PROP_AUTO_GROW:
     322        g_value_set_boolean (value, priv->auto_grow);
     323        break;
     324  
     325      default:
     326        G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
     327        break;
     328      }
     329  
     330  }
     331  
     332  static void
     333  g_buffered_output_stream_finalize (GObject *object)
     334  {
     335    GBufferedOutputStream *stream;
     336    GBufferedOutputStreamPrivate *priv;
     337  
     338    stream = G_BUFFERED_OUTPUT_STREAM (object);
     339    priv = stream->priv;
     340  
     341    g_free (priv->buffer);
     342  
     343    G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
     344  }
     345  
     346  static void
     347  g_buffered_output_stream_init (GBufferedOutputStream *stream)
     348  {
     349    stream->priv = g_buffered_output_stream_get_instance_private (stream);
     350  }
     351  
     352  static void
     353  g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
     354  {
     355    iface->tell         = g_buffered_output_stream_tell;
     356    iface->can_seek     = g_buffered_output_stream_can_seek;
     357    iface->seek         = g_buffered_output_stream_seek;
     358    iface->can_truncate = g_buffered_output_stream_can_truncate;
     359    iface->truncate_fn  = g_buffered_output_stream_truncate;
     360  }
     361  
     362  /**
     363   * g_buffered_output_stream_new:
     364   * @base_stream: a #GOutputStream.
     365   * 
     366   * Creates a new buffered output stream for a base stream.
     367   * 
     368   * Returns: a #GOutputStream for the given @base_stream.
     369   **/  
     370  GOutputStream *
     371  g_buffered_output_stream_new (GOutputStream *base_stream)
     372  {
     373    GOutputStream *stream;
     374  
     375    g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
     376  
     377    stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
     378                           "base-stream", base_stream,
     379                           NULL);
     380  
     381    return stream;
     382  }
     383  
     384  /**
     385   * g_buffered_output_stream_new_sized:
     386   * @base_stream: a #GOutputStream.
     387   * @size: a #gsize.
     388   * 
     389   * Creates a new buffered output stream with a given buffer size.
     390   * 
     391   * Returns: a #GOutputStream with an internal buffer set to @size.
     392   **/  
     393  GOutputStream *
     394  g_buffered_output_stream_new_sized (GOutputStream *base_stream,
     395                                      gsize          size)
     396  {
     397    GOutputStream *stream;
     398  
     399    g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
     400  
     401    stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
     402                           "base-stream", base_stream,
     403                           "buffer-size", size,
     404                           NULL);
     405  
     406    return stream;
     407  }
     408  
     409  static gboolean
     410  flush_buffer (GBufferedOutputStream  *stream,
     411                GCancellable           *cancellable,
     412                GError                 **error)
     413  {
     414    GBufferedOutputStreamPrivate *priv;
     415    GOutputStream                *base_stream;
     416    gboolean                      res;
     417    gsize                         bytes_written;
     418    gsize                         count;
     419  
     420    priv = stream->priv;
     421    bytes_written = 0;
     422    base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
     423  
     424    g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
     425  
     426    res = g_output_stream_write_all (base_stream,
     427                                     priv->buffer,
     428                                     priv->pos,
     429                                     &bytes_written,
     430                                     cancellable,
     431                                     error);
     432  
     433    count = priv->pos - bytes_written;
     434  
     435    if (count > 0)
     436      memmove (priv->buffer, priv->buffer + bytes_written, count);
     437    
     438    priv->pos -= bytes_written;
     439  
     440    return res;
     441  }
     442  
     443  static gssize
     444  g_buffered_output_stream_write  (GOutputStream *stream,
     445                                   const void    *buffer,
     446                                   gsize          count,
     447                                   GCancellable  *cancellable,
     448                                   GError       **error)
     449  {
     450    GBufferedOutputStream        *bstream;
     451    GBufferedOutputStreamPrivate *priv;
     452    gboolean res;
     453    gsize    n;
     454    gsize new_size;
     455  
     456    bstream = G_BUFFERED_OUTPUT_STREAM (stream);
     457    priv = bstream->priv;
     458  
     459    n = priv->len - priv->pos;
     460  
     461    if (priv->auto_grow && n < count)
     462      {
     463        new_size = MAX (priv->len * 2, priv->len + count);
     464        g_buffered_output_stream_set_buffer_size (bstream, new_size);
     465      }
     466    else if (n == 0)
     467      {
     468        res = flush_buffer (bstream, cancellable, error);
     469        
     470        if (res == FALSE)
     471  	return -1;
     472      }
     473  
     474    n = priv->len - priv->pos;
     475    
     476    count = MIN (count, n);
     477    memcpy (priv->buffer + priv->pos, buffer, count);
     478    priv->pos += count;
     479  
     480    return count;
     481  }
     482  
     483  static gboolean
     484  g_buffered_output_stream_flush (GOutputStream  *stream,
     485                                  GCancellable   *cancellable,
     486                                  GError        **error)
     487  {
     488    GBufferedOutputStream *bstream;
     489    GOutputStream                *base_stream;
     490    gboolean res;
     491  
     492    bstream = G_BUFFERED_OUTPUT_STREAM (stream);
     493    base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
     494  
     495    res = flush_buffer (bstream, cancellable, error);
     496  
     497    if (res == FALSE)
     498      return FALSE;
     499  
     500    res = g_output_stream_flush (base_stream, cancellable, error);
     501  
     502    return res;
     503  }
     504  
     505  static gboolean
     506  g_buffered_output_stream_close (GOutputStream  *stream,
     507                                  GCancellable   *cancellable,
     508                                  GError        **error)
     509  {
     510    GBufferedOutputStream        *bstream;
     511    GOutputStream                *base_stream;
     512    gboolean                      res;
     513  
     514    bstream = G_BUFFERED_OUTPUT_STREAM (stream);
     515    base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
     516    res = flush_buffer (bstream, cancellable, error);
     517  
     518    if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
     519      {
     520        /* report the first error but still close the stream */
     521        if (res)
     522          res = g_output_stream_close (base_stream, cancellable, error);
     523        else
     524          g_output_stream_close (base_stream, cancellable, NULL);
     525      }
     526  
     527    return res;
     528  }
     529  
     530  static goffset
     531  g_buffered_output_stream_tell (GSeekable *seekable)
     532  {
     533    GBufferedOutputStream        *bstream;
     534    GBufferedOutputStreamPrivate *priv;
     535    GOutputStream *base_stream;
     536    GSeekable    *base_stream_seekable;
     537    goffset base_offset;
     538    
     539    bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
     540    priv = bstream->priv;
     541  
     542    base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
     543    if (!G_IS_SEEKABLE (base_stream))
     544      return 0;
     545  
     546    base_stream_seekable = G_SEEKABLE (base_stream);
     547    
     548    base_offset = g_seekable_tell (base_stream_seekable);
     549    return base_offset + priv->pos;
     550  }
     551  
     552  static gboolean
     553  g_buffered_output_stream_can_seek (GSeekable *seekable)
     554  {
     555    GOutputStream *base_stream;
     556    
     557    base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
     558    return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
     559  }
     560  
     561  static gboolean
     562  g_buffered_output_stream_seek (GSeekable     *seekable,
     563  			       goffset        offset,
     564  			       GSeekType      type,
     565  			       GCancellable  *cancellable,
     566  			       GError       **error)
     567  {
     568    GBufferedOutputStream *bstream;
     569    GOutputStream *base_stream;
     570    GSeekable *base_stream_seekable;
     571    gboolean flushed;
     572  
     573    bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
     574  
     575    base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
     576    if (!G_IS_SEEKABLE (base_stream))
     577      {
     578        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
     579                             _("Seek not supported on base stream"));
     580        return FALSE;
     581      }
     582  
     583    base_stream_seekable = G_SEEKABLE (base_stream);
     584    flushed = flush_buffer (bstream, cancellable, error);
     585    if (!flushed)
     586      return FALSE;
     587  
     588    return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
     589  }
     590  
     591  static gboolean
     592  g_buffered_output_stream_can_truncate (GSeekable *seekable)
     593  {
     594    GOutputStream *base_stream;
     595    
     596    base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
     597    return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
     598  }
     599  
     600  static gboolean
     601  g_buffered_output_stream_truncate (GSeekable     *seekable,
     602  				   goffset        offset,
     603  				   GCancellable  *cancellable,
     604  				   GError       **error)
     605  {
     606    GBufferedOutputStream        *bstream;
     607    GOutputStream *base_stream;
     608    GSeekable *base_stream_seekable;
     609    gboolean flushed;
     610  
     611    bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
     612    base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
     613    if (!G_IS_SEEKABLE (base_stream))
     614      {
     615        g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
     616                             _("Truncate not supported on base stream"));
     617        return FALSE;
     618      }
     619  
     620    base_stream_seekable = G_SEEKABLE (base_stream);
     621  
     622    flushed = flush_buffer (bstream, cancellable, error);
     623    if (!flushed)
     624      return FALSE;
     625    return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
     626  }
     627  
     628  /* ************************** */
     629  /* Async stuff implementation */
     630  /* ************************** */
     631  
     632  /* TODO: This should be using the base class async ops, not threads */
     633  
     634  typedef struct {
     635  
     636    guint flush_stream : 1;
     637    guint close_stream : 1;
     638  
     639  } FlushData;
     640  
     641  static void
     642  free_flush_data (gpointer data)
     643  {
     644    g_slice_free (FlushData, data);
     645  }
     646  
     647  /* This function is used by all three (i.e. 
     648   * _write, _flush, _close) functions since
     649   * all of them will need to flush the buffer
     650   * and so closing and writing is just a special
     651   * case of flushing + some addition stuff */
     652  static void
     653  flush_buffer_thread (GTask        *task,
     654                       gpointer      object,
     655                       gpointer      task_data,
     656                       GCancellable *cancellable)
     657  {
     658    GBufferedOutputStream *stream;
     659    GOutputStream *base_stream;
     660    FlushData     *fdata;
     661    gboolean       res;
     662    GError        *error = NULL;
     663  
     664    stream = G_BUFFERED_OUTPUT_STREAM (object);
     665    fdata = task_data;
     666    base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
     667  
     668    res = flush_buffer (stream, cancellable, &error);
     669  
     670    /* if flushing the buffer didn't work don't even bother
     671     * to flush the stream but just report that error */
     672    if (res && fdata->flush_stream)
     673      res = g_output_stream_flush (base_stream, cancellable, &error);
     674  
     675    if (fdata->close_stream) 
     676      {
     677       
     678        /* if flushing the buffer or the stream returned 
     679         * an error report that first error but still try 
     680         * close the stream */
     681        if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
     682          {
     683            if (res == FALSE)
     684              g_output_stream_close (base_stream, cancellable, NULL);
     685            else 
     686              res = g_output_stream_close (base_stream, cancellable, &error);
     687          }
     688      }
     689  
     690    if (res == FALSE)
     691      g_task_return_error (task, error);
     692    else
     693      g_task_return_boolean (task, TRUE);
     694  }
     695  
     696  static void
     697  g_buffered_output_stream_flush_async (GOutputStream        *stream,
     698                                        int                   io_priority,
     699                                        GCancellable         *cancellable,
     700                                        GAsyncReadyCallback   callback,
     701                                        gpointer              data)
     702  {
     703    GTask *task;
     704    FlushData *fdata;
     705  
     706    fdata = g_slice_new0 (FlushData);
     707    fdata->flush_stream = TRUE;
     708    fdata->close_stream = FALSE;
     709  
     710    task = g_task_new (stream, cancellable, callback, data);
     711    g_task_set_source_tag (task, g_buffered_output_stream_flush_async);
     712    g_task_set_task_data (task, fdata, free_flush_data);
     713    g_task_set_priority (task, io_priority);
     714  
     715    g_task_run_in_thread (task, flush_buffer_thread);
     716    g_object_unref (task);
     717  }
     718  
     719  static gboolean
     720  g_buffered_output_stream_flush_finish (GOutputStream        *stream,
     721                                         GAsyncResult         *result,
     722                                         GError              **error)
     723  {
     724    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
     725  
     726    return g_task_propagate_boolean (G_TASK (result), error);
     727  }
     728  
     729  static void
     730  g_buffered_output_stream_close_async (GOutputStream        *stream,
     731                                        int                   io_priority,
     732                                        GCancellable         *cancellable,
     733                                        GAsyncReadyCallback   callback,
     734                                        gpointer              data)
     735  {
     736    GTask *task;
     737    FlushData *fdata;
     738  
     739    fdata = g_slice_new0 (FlushData);
     740    fdata->close_stream = TRUE;
     741  
     742    task = g_task_new (stream, cancellable, callback, data);
     743    g_task_set_source_tag (task, g_buffered_output_stream_close_async);
     744    g_task_set_task_data (task, fdata, free_flush_data);
     745    g_task_set_priority (task, io_priority);
     746  
     747    g_task_run_in_thread (task, flush_buffer_thread);
     748    g_object_unref (task);
     749  }
     750  
     751  static gboolean
     752  g_buffered_output_stream_close_finish (GOutputStream        *stream,
     753                                         GAsyncResult         *result,
     754                                         GError              **error)
     755  {
     756    g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
     757  
     758    return g_task_propagate_boolean (G_TASK (result), error);
     759  }