1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * SPDX-License-Identifier: LGPL-2.1-or-later
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General
18 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 *
20 * Author: Christian Kellner <gicmo@gnome.org>
21 */
22
23 #include "config.h"
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gseekable.h"
27 #include "gtask.h"
28 #include "string.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31
32 /**
33 * GBufferedOutputStream:
34 *
35 * Buffered output stream implements [class@Gio.FilterOutputStream] and provides
36 * for buffered writes.
37 *
38 * By default, `GBufferedOutputStream`'s buffer size is set at 4 kilobytes.
39 *
40 * To create a buffered output stream, use [ctor@Gio.BufferedOutputStream.new],
41 * or [ctor@Gio.BufferedOutputStream.new_sized] to specify the buffer's size
42 * at construction.
43 *
44 * To get the size of a buffer within a buffered input stream, use
45 * [method@Gio.BufferedOutputStream.get_buffer_size]. To change the size of a
46 * buffered output stream's buffer, use [method@Gio.BufferedOutputStream.set_buffer_size].
47 * Note that the buffer's size cannot be reduced below the size of the data within the buffer.
48 */
49
50 #define DEFAULT_BUFFER_SIZE 4096
51
52 struct _GBufferedOutputStreamPrivate {
53 guint8 *buffer;
54 gsize len;
55 goffset pos;
56 gboolean auto_grow;
57 };
58
59 enum {
60 PROP_0,
61 PROP_BUFSIZE,
62 PROP_AUTO_GROW
63 };
64
65 static void g_buffered_output_stream_set_property (GObject *object,
66 guint prop_id,
67 const GValue *value,
68 GParamSpec *pspec);
69
70 static void g_buffered_output_stream_get_property (GObject *object,
71 guint prop_id,
72 GValue *value,
73 GParamSpec *pspec);
74 static void g_buffered_output_stream_finalize (GObject *object);
75
76
77 static gssize g_buffered_output_stream_write (GOutputStream *stream,
78 const void *buffer,
79 gsize count,
80 GCancellable *cancellable,
81 GError **error);
82 static gboolean g_buffered_output_stream_flush (GOutputStream *stream,
83 GCancellable *cancellable,
84 GError **error);
85 static gboolean g_buffered_output_stream_close (GOutputStream *stream,
86 GCancellable *cancellable,
87 GError **error);
88
89 static void g_buffered_output_stream_flush_async (GOutputStream *stream,
90 int io_priority,
91 GCancellable *cancellable,
92 GAsyncReadyCallback callback,
93 gpointer data);
94 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream,
95 GAsyncResult *result,
96 GError **error);
97 static void g_buffered_output_stream_close_async (GOutputStream *stream,
98 int io_priority,
99 GCancellable *cancellable,
100 GAsyncReadyCallback callback,
101 gpointer data);
102 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream,
103 GAsyncResult *result,
104 GError **error);
105
106 static void g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface);
107 static goffset g_buffered_output_stream_tell (GSeekable *seekable);
108 static gboolean g_buffered_output_stream_can_seek (GSeekable *seekable);
109 static gboolean g_buffered_output_stream_seek (GSeekable *seekable,
110 goffset offset,
111 GSeekType type,
112 GCancellable *cancellable,
113 GError **error);
114 static gboolean g_buffered_output_stream_can_truncate (GSeekable *seekable);
115 static gboolean g_buffered_output_stream_truncate (GSeekable *seekable,
116 goffset offset,
117 GCancellable *cancellable,
118 GError **error);
119
120 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
121 g_buffered_output_stream,
122 G_TYPE_FILTER_OUTPUT_STREAM,
123 G_ADD_PRIVATE (GBufferedOutputStream)
124 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
125 g_buffered_output_stream_seekable_iface_init))
126
127
128 static void
129 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
130 {
131 GObjectClass *object_class;
132 GOutputStreamClass *ostream_class;
133
134 object_class = G_OBJECT_CLASS (klass);
135 object_class->get_property = g_buffered_output_stream_get_property;
136 object_class->set_property = g_buffered_output_stream_set_property;
137 object_class->finalize = g_buffered_output_stream_finalize;
138
139 ostream_class = G_OUTPUT_STREAM_CLASS (klass);
140 ostream_class->write_fn = g_buffered_output_stream_write;
141 ostream_class->flush = g_buffered_output_stream_flush;
142 ostream_class->close_fn = g_buffered_output_stream_close;
143 ostream_class->flush_async = g_buffered_output_stream_flush_async;
144 ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
145 ostream_class->close_async = g_buffered_output_stream_close_async;
146 ostream_class->close_finish = g_buffered_output_stream_close_finish;
147
148 /**
149 * GBufferedOutputStream:buffer-size:
150 *
151 * The size of the backend buffer, in bytes.
152 */
153 g_object_class_install_property (object_class,
154 PROP_BUFSIZE,
155 g_param_spec_uint ("buffer-size", NULL, NULL,
156 1,
157 G_MAXUINT,
158 DEFAULT_BUFFER_SIZE,
159 G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
160 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
161
162 /**
163 * GBufferedOutputStream:auto-grow:
164 *
165 * Whether the buffer should automatically grow.
166 */
167 g_object_class_install_property (object_class,
168 PROP_AUTO_GROW,
169 g_param_spec_boolean ("auto-grow", NULL, NULL,
170 FALSE,
171 G_PARAM_READWRITE|
172 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
173
174 }
175
176 /**
177 * g_buffered_output_stream_get_buffer_size:
178 * @stream: a #GBufferedOutputStream.
179 *
180 * Gets the size of the buffer in the @stream.
181 *
182 * Returns: the current size of the buffer.
183 **/
184 gsize
185 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
186 {
187 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
188
189 return stream->priv->len;
190 }
191
192 /**
193 * g_buffered_output_stream_set_buffer_size:
194 * @stream: a #GBufferedOutputStream.
195 * @size: a #gsize.
196 *
197 * Sets the size of the internal buffer to @size.
198 **/
199 void
200 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
201 gsize size)
202 {
203 GBufferedOutputStreamPrivate *priv;
204 guint8 *buffer;
205
206 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
207
208 priv = stream->priv;
209
210 if (size == priv->len)
211 return;
212
213 if (priv->buffer)
214 {
215 size = (priv->pos > 0) ? MAX (size, (gsize) priv->pos) : size;
216
217 buffer = g_malloc (size);
218 memcpy (buffer, priv->buffer, priv->pos);
219 g_free (priv->buffer);
220 priv->buffer = buffer;
221 priv->len = size;
222 /* Keep old pos */
223 }
224 else
225 {
226 priv->buffer = g_malloc (size);
227 priv->len = size;
228 priv->pos = 0;
229 }
230
231 g_object_notify (G_OBJECT (stream), "buffer-size");
232 }
233
234 /**
235 * g_buffered_output_stream_get_auto_grow:
236 * @stream: a #GBufferedOutputStream.
237 *
238 * Checks if the buffer automatically grows as data is added.
239 *
240 * Returns: %TRUE if the @stream's buffer automatically grows,
241 * %FALSE otherwise.
242 **/
243 gboolean
244 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
245 {
246 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
247
248 return stream->priv->auto_grow;
249 }
250
251 /**
252 * g_buffered_output_stream_set_auto_grow:
253 * @stream: a #GBufferedOutputStream.
254 * @auto_grow: a #gboolean.
255 *
256 * Sets whether or not the @stream's buffer should automatically grow.
257 * If @auto_grow is true, then each write will just make the buffer
258 * larger, and you must manually flush the buffer to actually write out
259 * the data to the underlying stream.
260 **/
261 void
262 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
263 gboolean auto_grow)
264 {
265 GBufferedOutputStreamPrivate *priv;
266 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
267 priv = stream->priv;
268 auto_grow = auto_grow != FALSE;
269 if (priv->auto_grow != auto_grow)
270 {
271 priv->auto_grow = auto_grow;
272 g_object_notify (G_OBJECT (stream), "auto-grow");
273 }
274 }
275
276 static void
277 g_buffered_output_stream_set_property (GObject *object,
278 guint prop_id,
279 const GValue *value,
280 GParamSpec *pspec)
281 {
282 GBufferedOutputStream *stream;
283
284 stream = G_BUFFERED_OUTPUT_STREAM (object);
285
286 switch (prop_id)
287 {
288 case PROP_BUFSIZE:
289 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
290 break;
291
292 case PROP_AUTO_GROW:
293 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
294 break;
295
296 default:
297 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
298 break;
299 }
300
301 }
302
303 static void
304 g_buffered_output_stream_get_property (GObject *object,
305 guint prop_id,
306 GValue *value,
307 GParamSpec *pspec)
308 {
309 GBufferedOutputStream *buffered_stream;
310 GBufferedOutputStreamPrivate *priv;
311
312 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
313 priv = buffered_stream->priv;
314
315 switch (prop_id)
316 {
317 case PROP_BUFSIZE:
318 g_value_set_uint (value, priv->len);
319 break;
320
321 case PROP_AUTO_GROW:
322 g_value_set_boolean (value, priv->auto_grow);
323 break;
324
325 default:
326 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
327 break;
328 }
329
330 }
331
332 static void
333 g_buffered_output_stream_finalize (GObject *object)
334 {
335 GBufferedOutputStream *stream;
336 GBufferedOutputStreamPrivate *priv;
337
338 stream = G_BUFFERED_OUTPUT_STREAM (object);
339 priv = stream->priv;
340
341 g_free (priv->buffer);
342
343 G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
344 }
345
346 static void
347 g_buffered_output_stream_init (GBufferedOutputStream *stream)
348 {
349 stream->priv = g_buffered_output_stream_get_instance_private (stream);
350 }
351
352 static void
353 g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
354 {
355 iface->tell = g_buffered_output_stream_tell;
356 iface->can_seek = g_buffered_output_stream_can_seek;
357 iface->seek = g_buffered_output_stream_seek;
358 iface->can_truncate = g_buffered_output_stream_can_truncate;
359 iface->truncate_fn = g_buffered_output_stream_truncate;
360 }
361
362 /**
363 * g_buffered_output_stream_new:
364 * @base_stream: a #GOutputStream.
365 *
366 * Creates a new buffered output stream for a base stream.
367 *
368 * Returns: a #GOutputStream for the given @base_stream.
369 **/
370 GOutputStream *
371 g_buffered_output_stream_new (GOutputStream *base_stream)
372 {
373 GOutputStream *stream;
374
375 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
376
377 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
378 "base-stream", base_stream,
379 NULL);
380
381 return stream;
382 }
383
384 /**
385 * g_buffered_output_stream_new_sized:
386 * @base_stream: a #GOutputStream.
387 * @size: a #gsize.
388 *
389 * Creates a new buffered output stream with a given buffer size.
390 *
391 * Returns: a #GOutputStream with an internal buffer set to @size.
392 **/
393 GOutputStream *
394 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
395 gsize size)
396 {
397 GOutputStream *stream;
398
399 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
400
401 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
402 "base-stream", base_stream,
403 "buffer-size", size,
404 NULL);
405
406 return stream;
407 }
408
409 static gboolean
410 flush_buffer (GBufferedOutputStream *stream,
411 GCancellable *cancellable,
412 GError **error)
413 {
414 GBufferedOutputStreamPrivate *priv;
415 GOutputStream *base_stream;
416 gboolean res;
417 gsize bytes_written;
418 gsize count;
419
420 priv = stream->priv;
421 bytes_written = 0;
422 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
423
424 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
425
426 res = g_output_stream_write_all (base_stream,
427 priv->buffer,
428 priv->pos,
429 &bytes_written,
430 cancellable,
431 error);
432
433 count = priv->pos - bytes_written;
434
435 if (count > 0)
436 memmove (priv->buffer, priv->buffer + bytes_written, count);
437
438 priv->pos -= bytes_written;
439
440 return res;
441 }
442
443 static gssize
444 g_buffered_output_stream_write (GOutputStream *stream,
445 const void *buffer,
446 gsize count,
447 GCancellable *cancellable,
448 GError **error)
449 {
450 GBufferedOutputStream *bstream;
451 GBufferedOutputStreamPrivate *priv;
452 gboolean res;
453 gsize n;
454 gsize new_size;
455
456 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
457 priv = bstream->priv;
458
459 n = priv->len - priv->pos;
460
461 if (priv->auto_grow && n < count)
462 {
463 new_size = MAX (priv->len * 2, priv->len + count);
464 g_buffered_output_stream_set_buffer_size (bstream, new_size);
465 }
466 else if (n == 0)
467 {
468 res = flush_buffer (bstream, cancellable, error);
469
470 if (res == FALSE)
471 return -1;
472 }
473
474 n = priv->len - priv->pos;
475
476 count = MIN (count, n);
477 memcpy (priv->buffer + priv->pos, buffer, count);
478 priv->pos += count;
479
480 return count;
481 }
482
483 static gboolean
484 g_buffered_output_stream_flush (GOutputStream *stream,
485 GCancellable *cancellable,
486 GError **error)
487 {
488 GBufferedOutputStream *bstream;
489 GOutputStream *base_stream;
490 gboolean res;
491
492 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
493 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
494
495 res = flush_buffer (bstream, cancellable, error);
496
497 if (res == FALSE)
498 return FALSE;
499
500 res = g_output_stream_flush (base_stream, cancellable, error);
501
502 return res;
503 }
504
505 static gboolean
506 g_buffered_output_stream_close (GOutputStream *stream,
507 GCancellable *cancellable,
508 GError **error)
509 {
510 GBufferedOutputStream *bstream;
511 GOutputStream *base_stream;
512 gboolean res;
513
514 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
515 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
516 res = flush_buffer (bstream, cancellable, error);
517
518 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
519 {
520 /* report the first error but still close the stream */
521 if (res)
522 res = g_output_stream_close (base_stream, cancellable, error);
523 else
524 g_output_stream_close (base_stream, cancellable, NULL);
525 }
526
527 return res;
528 }
529
530 static goffset
531 g_buffered_output_stream_tell (GSeekable *seekable)
532 {
533 GBufferedOutputStream *bstream;
534 GBufferedOutputStreamPrivate *priv;
535 GOutputStream *base_stream;
536 GSeekable *base_stream_seekable;
537 goffset base_offset;
538
539 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
540 priv = bstream->priv;
541
542 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
543 if (!G_IS_SEEKABLE (base_stream))
544 return 0;
545
546 base_stream_seekable = G_SEEKABLE (base_stream);
547
548 base_offset = g_seekable_tell (base_stream_seekable);
549 return base_offset + priv->pos;
550 }
551
552 static gboolean
553 g_buffered_output_stream_can_seek (GSeekable *seekable)
554 {
555 GOutputStream *base_stream;
556
557 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
558 return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
559 }
560
561 static gboolean
562 g_buffered_output_stream_seek (GSeekable *seekable,
563 goffset offset,
564 GSeekType type,
565 GCancellable *cancellable,
566 GError **error)
567 {
568 GBufferedOutputStream *bstream;
569 GOutputStream *base_stream;
570 GSeekable *base_stream_seekable;
571 gboolean flushed;
572
573 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
574
575 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
576 if (!G_IS_SEEKABLE (base_stream))
577 {
578 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
579 _("Seek not supported on base stream"));
580 return FALSE;
581 }
582
583 base_stream_seekable = G_SEEKABLE (base_stream);
584 flushed = flush_buffer (bstream, cancellable, error);
585 if (!flushed)
586 return FALSE;
587
588 return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
589 }
590
591 static gboolean
592 g_buffered_output_stream_can_truncate (GSeekable *seekable)
593 {
594 GOutputStream *base_stream;
595
596 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
597 return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
598 }
599
600 static gboolean
601 g_buffered_output_stream_truncate (GSeekable *seekable,
602 goffset offset,
603 GCancellable *cancellable,
604 GError **error)
605 {
606 GBufferedOutputStream *bstream;
607 GOutputStream *base_stream;
608 GSeekable *base_stream_seekable;
609 gboolean flushed;
610
611 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
612 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
613 if (!G_IS_SEEKABLE (base_stream))
614 {
615 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
616 _("Truncate not supported on base stream"));
617 return FALSE;
618 }
619
620 base_stream_seekable = G_SEEKABLE (base_stream);
621
622 flushed = flush_buffer (bstream, cancellable, error);
623 if (!flushed)
624 return FALSE;
625 return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
626 }
627
628 /* ************************** */
629 /* Async stuff implementation */
630 /* ************************** */
631
632 /* TODO: This should be using the base class async ops, not threads */
633
634 typedef struct {
635
636 guint flush_stream : 1;
637 guint close_stream : 1;
638
639 } FlushData;
640
641 static void
642 free_flush_data (gpointer data)
643 {
644 g_slice_free (FlushData, data);
645 }
646
647 /* This function is used by all three (i.e.
648 * _write, _flush, _close) functions since
649 * all of them will need to flush the buffer
650 * and so closing and writing is just a special
651 * case of flushing + some addition stuff */
652 static void
653 flush_buffer_thread (GTask *task,
654 gpointer object,
655 gpointer task_data,
656 GCancellable *cancellable)
657 {
658 GBufferedOutputStream *stream;
659 GOutputStream *base_stream;
660 FlushData *fdata;
661 gboolean res;
662 GError *error = NULL;
663
664 stream = G_BUFFERED_OUTPUT_STREAM (object);
665 fdata = task_data;
666 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
667
668 res = flush_buffer (stream, cancellable, &error);
669
670 /* if flushing the buffer didn't work don't even bother
671 * to flush the stream but just report that error */
672 if (res && fdata->flush_stream)
673 res = g_output_stream_flush (base_stream, cancellable, &error);
674
675 if (fdata->close_stream)
676 {
677
678 /* if flushing the buffer or the stream returned
679 * an error report that first error but still try
680 * close the stream */
681 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
682 {
683 if (res == FALSE)
684 g_output_stream_close (base_stream, cancellable, NULL);
685 else
686 res = g_output_stream_close (base_stream, cancellable, &error);
687 }
688 }
689
690 if (res == FALSE)
691 g_task_return_error (task, error);
692 else
693 g_task_return_boolean (task, TRUE);
694 }
695
696 static void
697 g_buffered_output_stream_flush_async (GOutputStream *stream,
698 int io_priority,
699 GCancellable *cancellable,
700 GAsyncReadyCallback callback,
701 gpointer data)
702 {
703 GTask *task;
704 FlushData *fdata;
705
706 fdata = g_slice_new0 (FlushData);
707 fdata->flush_stream = TRUE;
708 fdata->close_stream = FALSE;
709
710 task = g_task_new (stream, cancellable, callback, data);
711 g_task_set_source_tag (task, g_buffered_output_stream_flush_async);
712 g_task_set_task_data (task, fdata, free_flush_data);
713 g_task_set_priority (task, io_priority);
714
715 g_task_run_in_thread (task, flush_buffer_thread);
716 g_object_unref (task);
717 }
718
719 static gboolean
720 g_buffered_output_stream_flush_finish (GOutputStream *stream,
721 GAsyncResult *result,
722 GError **error)
723 {
724 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
725
726 return g_task_propagate_boolean (G_TASK (result), error);
727 }
728
729 static void
730 g_buffered_output_stream_close_async (GOutputStream *stream,
731 int io_priority,
732 GCancellable *cancellable,
733 GAsyncReadyCallback callback,
734 gpointer data)
735 {
736 GTask *task;
737 FlushData *fdata;
738
739 fdata = g_slice_new0 (FlushData);
740 fdata->close_stream = TRUE;
741
742 task = g_task_new (stream, cancellable, callback, data);
743 g_task_set_source_tag (task, g_buffered_output_stream_close_async);
744 g_task_set_task_data (task, fdata, free_flush_data);
745 g_task_set_priority (task, io_priority);
746
747 g_task_run_in_thread (task, flush_buffer_thread);
748 g_object_unref (task);
749 }
750
751 static gboolean
752 g_buffered_output_stream_close_finish (GOutputStream *stream,
753 GAsyncResult *result,
754 GError **error)
755 {
756 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
757
758 return g_task_propagate_boolean (G_TASK (result), error);
759 }