1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 *
5 * SPDX-License-Identifier: LGPL-2.1-or-later
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General
18 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 *
20 * Author: Alexander Larsson <alexl@redhat.com>
21 */
22
23 #include "config.h"
24 #include <glib.h>
25 #include "glibintl.h"
26
27 #include "ginputstream.h"
28 #include "gioprivate.h"
29 #include "gseekable.h"
30 #include "gcancellable.h"
31 #include "gasyncresult.h"
32 #include "gioerror.h"
33 #include "gpollableinputstream.h"
34
35 /**
36 * GInputStream:
37 *
38 * `GInputStream` is a base class for implementing streaming input.
39 *
40 * It has functions to read from a stream ([method@Gio.InputStream.read]),
41 * to close a stream ([method@Gio.InputStream.close]) and to skip some content
42 * ([method@Gio.InputStream.skip]).
43 *
44 * To copy the content of an input stream to an output stream without
45 * manually handling the reads and writes, use [method@Gio.OutputStream.splice].
46 *
47 * See the documentation for [class@Gio.IOStream] for details of thread safety
48 * of streaming APIs.
49 *
50 * All of these functions have async variants too.
51 **/
52
53 struct _GInputStreamPrivate {
54 guint closed : 1;
55 guint pending : 1;
56 GAsyncReadyCallback outstanding_callback;
57 };
58
59 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GInputStream, g_input_stream, G_TYPE_OBJECT)
60
61 static gssize g_input_stream_real_skip (GInputStream *stream,
62 gsize count,
63 GCancellable *cancellable,
64 GError **error);
65 static void g_input_stream_real_read_async (GInputStream *stream,
66 void *buffer,
67 gsize count,
68 int io_priority,
69 GCancellable *cancellable,
70 GAsyncReadyCallback callback,
71 gpointer user_data);
72 static gssize g_input_stream_real_read_finish (GInputStream *stream,
73 GAsyncResult *result,
74 GError **error);
75 static void g_input_stream_real_skip_async (GInputStream *stream,
76 gsize count,
77 int io_priority,
78 GCancellable *cancellable,
79 GAsyncReadyCallback callback,
80 gpointer data);
81 static gssize g_input_stream_real_skip_finish (GInputStream *stream,
82 GAsyncResult *result,
83 GError **error);
84 static void g_input_stream_real_close_async (GInputStream *stream,
85 int io_priority,
86 GCancellable *cancellable,
87 GAsyncReadyCallback callback,
88 gpointer data);
89 static gboolean g_input_stream_real_close_finish (GInputStream *stream,
90 GAsyncResult *result,
91 GError **error);
92
93 static void
94 g_input_stream_dispose (GObject *object)
95 {
96 GInputStream *stream;
97
98 stream = G_INPUT_STREAM (object);
99
100 if (!stream->priv->closed)
101 g_input_stream_close (stream, NULL, NULL);
102
103 G_OBJECT_CLASS (g_input_stream_parent_class)->dispose (object);
104 }
105
106
107 static void
108 g_input_stream_class_init (GInputStreamClass *klass)
109 {
110 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
111
112 gobject_class->dispose = g_input_stream_dispose;
113
114 klass->skip = g_input_stream_real_skip;
115 klass->read_async = g_input_stream_real_read_async;
116 klass->read_finish = g_input_stream_real_read_finish;
117 klass->skip_async = g_input_stream_real_skip_async;
118 klass->skip_finish = g_input_stream_real_skip_finish;
119 klass->close_async = g_input_stream_real_close_async;
120 klass->close_finish = g_input_stream_real_close_finish;
121 }
122
123 static void
124 g_input_stream_init (GInputStream *stream)
125 {
126 stream->priv = g_input_stream_get_instance_private (stream);
127 }
128
129 /**
130 * g_input_stream_read:
131 * @stream: a #GInputStream.
132 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
133 * a buffer to read data into (which should be at least count bytes long).
134 * @count: (in): the number of bytes that will be read from the stream
135 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
136 * @error: location to store the error occurring, or %NULL to ignore
137 *
138 * Tries to read @count bytes from the stream into the buffer starting at
139 * @buffer. Will block during this read.
140 *
141 * If count is zero returns zero and does nothing. A value of @count
142 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
143 *
144 * On success, the number of bytes read into the buffer is returned.
145 * It is not an error if this is not the same as the requested size, as it
146 * can happen e.g. near the end of a file. Zero is returned on end of file
147 * (or if @count is zero), but never otherwise.
148 *
149 * The returned @buffer is not a nul-terminated string, it can contain nul bytes
150 * at any position, and this function doesn't nul-terminate the @buffer.
151 *
152 * If @cancellable is not %NULL, then the operation can be cancelled by
153 * triggering the cancellable object from another thread. If the operation
154 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
155 * operation was partially finished when the operation was cancelled the
156 * partial result will be returned, without an error.
157 *
158 * On error -1 is returned and @error is set accordingly.
159 *
160 * Returns: Number of bytes read, or -1 on error, or 0 on end of file.
161 **/
162 gssize
163 g_input_stream_read (GInputStream *stream,
164 void *buffer,
165 gsize count,
166 GCancellable *cancellable,
167 GError **error)
168 {
169 GInputStreamClass *class;
170 gssize res;
171
172 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
173 g_return_val_if_fail (buffer != NULL, 0);
174
175 if (count == 0)
176 return 0;
177
178 if (((gssize) count) < 0)
179 {
180 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
181 _("Too large count value passed to %s"), G_STRFUNC);
182 return -1;
183 }
184
185 class = G_INPUT_STREAM_GET_CLASS (stream);
186
187 if (class->read_fn == NULL)
188 {
189 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
190 _("Input stream doesn’t implement read"));
191 return -1;
192 }
193
194 if (!g_input_stream_set_pending (stream, error))
195 return -1;
196
197 if (cancellable)
198 g_cancellable_push_current (cancellable);
199
200 res = class->read_fn (stream, buffer, count, cancellable, error);
201
202 if (cancellable)
203 g_cancellable_pop_current (cancellable);
204
205 g_input_stream_clear_pending (stream);
206
207 return res;
208 }
209
210 /**
211 * g_input_stream_read_all:
212 * @stream: a #GInputStream.
213 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
214 * a buffer to read data into (which should be at least count bytes long).
215 * @count: (in): the number of bytes that will be read from the stream
216 * @bytes_read: (out): location to store the number of bytes that was read from the stream
217 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
218 * @error: location to store the error occurring, or %NULL to ignore
219 *
220 * Tries to read @count bytes from the stream into the buffer starting at
221 * @buffer. Will block during this read.
222 *
223 * This function is similar to g_input_stream_read(), except it tries to
224 * read as many bytes as requested, only stopping on an error or end of stream.
225 *
226 * On a successful read of @count bytes, or if we reached the end of the
227 * stream, %TRUE is returned, and @bytes_read is set to the number of bytes
228 * read into @buffer.
229 *
230 * If there is an error during the operation %FALSE is returned and @error
231 * is set to indicate the error status.
232 *
233 * As a special exception to the normal conventions for functions that
234 * use #GError, if this function returns %FALSE (and sets @error) then
235 * @bytes_read will be set to the number of bytes that were successfully
236 * read before the error was encountered. This functionality is only
237 * available from C. If you need it from another language then you must
238 * write your own loop around g_input_stream_read().
239 *
240 * Returns: %TRUE on success, %FALSE if there was an error
241 **/
242 gboolean
243 g_input_stream_read_all (GInputStream *stream,
244 void *buffer,
245 gsize count,
246 gsize *bytes_read,
247 GCancellable *cancellable,
248 GError **error)
249 {
250 gsize _bytes_read;
251 gssize res;
252
253 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
254 g_return_val_if_fail (buffer != NULL, FALSE);
255
256 _bytes_read = 0;
257 while (_bytes_read < count)
258 {
259 res = g_input_stream_read (stream, (char *)buffer + _bytes_read, count - _bytes_read,
260 cancellable, error);
261 if (res == -1)
262 {
263 if (bytes_read)
264 *bytes_read = _bytes_read;
265 return FALSE;
266 }
267
268 if (res == 0)
269 break;
270
271 _bytes_read += res;
272 }
273
274 if (bytes_read)
275 *bytes_read = _bytes_read;
276 return TRUE;
277 }
278
279 /**
280 * g_input_stream_read_bytes:
281 * @stream: a #GInputStream.
282 * @count: maximum number of bytes that will be read from the stream. Common
283 * values include 4096 and 8192.
284 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
285 * @error: location to store the error occurring, or %NULL to ignore
286 *
287 * Like g_input_stream_read(), this tries to read @count bytes from
288 * the stream in a blocking fashion. However, rather than reading into
289 * a user-supplied buffer, this will create a new #GBytes containing
290 * the data that was read. This may be easier to use from language
291 * bindings.
292 *
293 * If count is zero, returns a zero-length #GBytes and does nothing. A
294 * value of @count larger than %G_MAXSSIZE will cause a
295 * %G_IO_ERROR_INVALID_ARGUMENT error.
296 *
297 * On success, a new #GBytes is returned. It is not an error if the
298 * size of this object is not the same as the requested size, as it
299 * can happen e.g. near the end of a file. A zero-length #GBytes is
300 * returned on end of file (or if @count is zero), but never
301 * otherwise.
302 *
303 * If @cancellable is not %NULL, then the operation can be cancelled by
304 * triggering the cancellable object from another thread. If the operation
305 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
306 * operation was partially finished when the operation was cancelled the
307 * partial result will be returned, without an error.
308 *
309 * On error %NULL is returned and @error is set accordingly.
310 *
311 * Returns: (transfer full): a new #GBytes, or %NULL on error
312 *
313 * Since: 2.34
314 **/
315 GBytes *
316 g_input_stream_read_bytes (GInputStream *stream,
317 gsize count,
318 GCancellable *cancellable,
319 GError **error)
320 {
321 guchar *buf;
322 gssize nread;
323
324 buf = g_malloc (count);
325 nread = g_input_stream_read (stream, buf, count, cancellable, error);
326 if (nread == -1)
327 {
328 g_free (buf);
329 return NULL;
330 }
331 else if (nread == 0)
332 {
333 g_free (buf);
334 return g_bytes_new_static ("", 0);
335 }
336 else
337 return g_bytes_new_take (buf, nread);
338 }
339
340 /**
341 * g_input_stream_skip:
342 * @stream: a #GInputStream.
343 * @count: the number of bytes that will be skipped from the stream
344 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
345 * @error: location to store the error occurring, or %NULL to ignore
346 *
347 * Tries to skip @count bytes from the stream. Will block during the operation.
348 *
349 * This is identical to g_input_stream_read(), from a behaviour standpoint,
350 * but the bytes that are skipped are not returned to the user. Some
351 * streams have an implementation that is more efficient than reading the data.
352 *
353 * This function is optional for inherited classes, as the default implementation
354 * emulates it using read.
355 *
356 * If @cancellable is not %NULL, then the operation can be cancelled by
357 * triggering the cancellable object from another thread. If the operation
358 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
359 * operation was partially finished when the operation was cancelled the
360 * partial result will be returned, without an error.
361 *
362 * Returns: Number of bytes skipped, or -1 on error
363 **/
364 gssize
365 g_input_stream_skip (GInputStream *stream,
366 gsize count,
367 GCancellable *cancellable,
368 GError **error)
369 {
370 GInputStreamClass *class;
371 gssize res;
372
373 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
374
375 if (count == 0)
376 return 0;
377
378 if (((gssize) count) < 0)
379 {
380 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
381 _("Too large count value passed to %s"), G_STRFUNC);
382 return -1;
383 }
384
385 class = G_INPUT_STREAM_GET_CLASS (stream);
386
387 if (!g_input_stream_set_pending (stream, error))
388 return -1;
389
390 if (cancellable)
391 g_cancellable_push_current (cancellable);
392
393 res = class->skip (stream, count, cancellable, error);
394
395 if (cancellable)
396 g_cancellable_pop_current (cancellable);
397
398 g_input_stream_clear_pending (stream);
399
400 return res;
401 }
402
403 static gssize
404 g_input_stream_real_skip (GInputStream *stream,
405 gsize count,
406 GCancellable *cancellable,
407 GError **error)
408 {
409 GInputStreamClass *class;
410 gssize ret, read_bytes;
411 char buffer[8192];
412 GError *my_error;
413
414 if (G_IS_SEEKABLE (stream) && g_seekable_can_seek (G_SEEKABLE (stream)))
415 {
416 GSeekable *seekable = G_SEEKABLE (stream);
417 goffset start, end;
418 gboolean success;
419
420 /* g_seekable_seek() may try to set pending itself */
421 stream->priv->pending = FALSE;
422
423 start = g_seekable_tell (seekable);
424
425 if (g_seekable_seek (G_SEEKABLE (stream),
426 0,
427 G_SEEK_END,
428 cancellable,
429 NULL))
430 {
431 end = g_seekable_tell (seekable);
432 g_assert (start >= 0);
433 g_assert (end >= start);
434 if (start > (goffset) (G_MAXOFFSET - count) ||
435 (goffset) (start + count) > end)
436 {
437 stream->priv->pending = TRUE;
438 return end - start;
439 }
440
441 success = g_seekable_seek (G_SEEKABLE (stream),
442 start + count,
443 G_SEEK_SET,
444 cancellable,
445 error);
446 stream->priv->pending = TRUE;
447
448 if (success)
449 return count;
450 else
451 return -1;
452 }
453 }
454
455 /* If not seekable, or seek failed, fall back to reading data: */
456
457 class = G_INPUT_STREAM_GET_CLASS (stream);
458
459 read_bytes = 0;
460 while (1)
461 {
462 my_error = NULL;
463
464 ret = class->read_fn (stream, buffer, MIN (sizeof (buffer), count),
465 cancellable, &my_error);
466 if (ret == -1)
467 {
468 if (read_bytes > 0 &&
469 my_error->domain == G_IO_ERROR &&
470 my_error->code == G_IO_ERROR_CANCELLED)
471 {
472 g_error_free (my_error);
473 return read_bytes;
474 }
475
476 g_propagate_error (error, my_error);
477 return -1;
478 }
479
480 count -= ret;
481 read_bytes += ret;
482
483 if (ret == 0 || count == 0)
484 return read_bytes;
485 }
486 }
487
488 /**
489 * g_input_stream_close:
490 * @stream: A #GInputStream.
491 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
492 * @error: location to store the error occurring, or %NULL to ignore
493 *
494 * Closes the stream, releasing resources related to it.
495 *
496 * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
497 * Closing a stream multiple times will not return an error.
498 *
499 * Streams will be automatically closed when the last reference
500 * is dropped, but you might want to call this function to make sure
501 * resources are released as early as possible.
502 *
503 * Some streams might keep the backing store of the stream (e.g. a file descriptor)
504 * open after the stream is closed. See the documentation for the individual
505 * stream for details.
506 *
507 * On failure the first error that happened will be reported, but the close
508 * operation will finish as much as possible. A stream that failed to
509 * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
510 * is important to check and report the error to the user.
511 *
512 * If @cancellable is not %NULL, then the operation can be cancelled by
513 * triggering the cancellable object from another thread. If the operation
514 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
515 * Cancelling a close will still leave the stream closed, but some streams
516 * can use a faster close that doesn't block to e.g. check errors.
517 *
518 * Returns: %TRUE on success, %FALSE on failure
519 **/
520 gboolean
521 g_input_stream_close (GInputStream *stream,
522 GCancellable *cancellable,
523 GError **error)
524 {
525 GInputStreamClass *class;
526 gboolean res;
527
528 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
529
530 class = G_INPUT_STREAM_GET_CLASS (stream);
531
532 if (stream->priv->closed)
533 return TRUE;
534
535 res = TRUE;
536
537 if (!g_input_stream_set_pending (stream, error))
538 return FALSE;
539
540 if (cancellable)
541 g_cancellable_push_current (cancellable);
542
543 if (class->close_fn)
544 res = class->close_fn (stream, cancellable, error);
545
546 if (cancellable)
547 g_cancellable_pop_current (cancellable);
548
549 g_input_stream_clear_pending (stream);
550
551 stream->priv->closed = TRUE;
552
553 return res;
554 }
555
556 static void
557 async_ready_callback_wrapper (GObject *source_object,
558 GAsyncResult *res,
559 gpointer user_data)
560 {
561 GInputStream *stream = G_INPUT_STREAM (source_object);
562
563 g_input_stream_clear_pending (stream);
564 if (stream->priv->outstanding_callback)
565 (*stream->priv->outstanding_callback) (source_object, res, user_data);
566 g_object_unref (stream);
567 }
568
569 static void
570 async_ready_close_callback_wrapper (GObject *source_object,
571 GAsyncResult *res,
572 gpointer user_data)
573 {
574 GInputStream *stream = G_INPUT_STREAM (source_object);
575
576 g_input_stream_clear_pending (stream);
577 stream->priv->closed = TRUE;
578 if (stream->priv->outstanding_callback)
579 (*stream->priv->outstanding_callback) (source_object, res, user_data);
580 g_object_unref (stream);
581 }
582
583 /**
584 * g_input_stream_read_async:
585 * @stream: A #GInputStream.
586 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
587 * a buffer to read data into (which should be at least count bytes long).
588 * @count: (in): the number of bytes that will be read from the stream
589 * @io_priority: the [I/O priority][io-priority]
590 * of the request.
591 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
592 * @callback: (scope async): a #GAsyncReadyCallback
593 * to call when the request is satisfied
594 * @user_data: the data to pass to callback function
595 *
596 * Request an asynchronous read of @count bytes from the stream into the buffer
597 * starting at @buffer. When the operation is finished @callback will be called.
598 * You can then call g_input_stream_read_finish() to get the result of the
599 * operation.
600 *
601 * During an async request no other sync and async calls are allowed on @stream, and will
602 * result in %G_IO_ERROR_PENDING errors.
603 *
604 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
605 *
606 * On success, the number of bytes read into the buffer will be passed to the
607 * callback. It is not an error if this is not the same as the requested size, as it
608 * can happen e.g. near the end of a file, but generally we try to read
609 * as many bytes as requested. Zero is returned on end of file
610 * (or if @count is zero), but never otherwise.
611 *
612 * Any outstanding i/o request with higher priority (lower numerical value) will
613 * be executed before an outstanding request with lower priority. Default
614 * priority is %G_PRIORITY_DEFAULT.
615 *
616 * The asynchronous methods have a default fallback that uses threads to implement
617 * asynchronicity, so they are optional for inheriting classes. However, if you
618 * override one you must override all.
619 **/
620 void
621 g_input_stream_read_async (GInputStream *stream,
622 void *buffer,
623 gsize count,
624 int io_priority,
625 GCancellable *cancellable,
626 GAsyncReadyCallback callback,
627 gpointer user_data)
628 {
629 GInputStreamClass *class;
630 GError *error = NULL;
631
632 g_return_if_fail (G_IS_INPUT_STREAM (stream));
633 g_return_if_fail (buffer != NULL);
634
635 if (count == 0)
636 {
637 GTask *task;
638
639 task = g_task_new (stream, cancellable, callback, user_data);
640 g_task_set_source_tag (task, g_input_stream_read_async);
641 g_task_return_int (task, 0);
642 g_object_unref (task);
643 return;
644 }
645
646 if (((gssize) count) < 0)
647 {
648 g_task_report_new_error (stream, callback, user_data,
649 g_input_stream_read_async,
650 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
651 _("Too large count value passed to %s"),
652 G_STRFUNC);
653 return;
654 }
655
656 if (!g_input_stream_set_pending (stream, &error))
657 {
658 g_task_report_error (stream, callback, user_data,
659 g_input_stream_read_async,
660 error);
661 return;
662 }
663
664 class = G_INPUT_STREAM_GET_CLASS (stream);
665 stream->priv->outstanding_callback = callback;
666 g_object_ref (stream);
667 class->read_async (stream, buffer, count, io_priority, cancellable,
668 async_ready_callback_wrapper, user_data);
669 }
670
671 /**
672 * g_input_stream_read_finish:
673 * @stream: a #GInputStream.
674 * @result: a #GAsyncResult.
675 * @error: a #GError location to store the error occurring, or %NULL to
676 * ignore.
677 *
678 * Finishes an asynchronous stream read operation.
679 *
680 * Returns: number of bytes read in, or -1 on error, or 0 on end of file.
681 **/
682 gssize
683 g_input_stream_read_finish (GInputStream *stream,
684 GAsyncResult *result,
685 GError **error)
686 {
687 GInputStreamClass *class;
688
689 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
690 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
691
692 if (g_async_result_legacy_propagate_error (result, error))
693 return -1;
694 else if (g_async_result_is_tagged (result, g_input_stream_read_async))
695 return g_task_propagate_int (G_TASK (result), error);
696
697 class = G_INPUT_STREAM_GET_CLASS (stream);
698 return class->read_finish (stream, result, error);
699 }
700
701 typedef struct
702 {
703 gchar *buffer;
704 gsize to_read;
705 gsize bytes_read;
706 } AsyncReadAll;
707
708 static void
709 free_async_read_all (gpointer data)
710 {
711 g_slice_free (AsyncReadAll, data);
712 }
713
714 static void
715 read_all_callback (GObject *stream,
716 GAsyncResult *result,
717 gpointer user_data)
718 {
719 GTask *task = user_data;
720 AsyncReadAll *data = g_task_get_task_data (task);
721 gboolean got_eof = FALSE;
722
723 if (result)
724 {
725 GError *error = NULL;
726 gssize nread;
727
728 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream), result, &error);
729
730 if (nread == -1)
731 {
732 g_task_return_error (task, error);
733 g_object_unref (task);
734 return;
735 }
736
737 g_assert_cmpint (nread, <=, data->to_read);
738 data->to_read -= nread;
739 data->bytes_read += nread;
740 got_eof = (nread == 0);
741 }
742
743 if (got_eof || data->to_read == 0)
744 {
745 g_task_return_boolean (task, TRUE);
746 g_object_unref (task);
747 }
748
749 else
750 g_input_stream_read_async (G_INPUT_STREAM (stream),
751 data->buffer + data->bytes_read,
752 data->to_read,
753 g_task_get_priority (task),
754 g_task_get_cancellable (task),
755 read_all_callback, task);
756 }
757
758
759 static void
760 read_all_async_thread (GTask *task,
761 gpointer source_object,
762 gpointer task_data,
763 GCancellable *cancellable)
764 {
765 GInputStream *stream = source_object;
766 AsyncReadAll *data = task_data;
767 GError *error = NULL;
768
769 if (g_input_stream_read_all (stream, data->buffer, data->to_read, &data->bytes_read,
770 g_task_get_cancellable (task), &error))
771 g_task_return_boolean (task, TRUE);
772 else
773 g_task_return_error (task, error);
774 }
775
776 /**
777 * g_input_stream_read_all_async:
778 * @stream: A #GInputStream
779 * @buffer: (array length=count) (element-type guint8) (out caller-allocates):
780 * a buffer to read data into (which should be at least count bytes long)
781 * @count: (in): the number of bytes that will be read from the stream
782 * @io_priority: the [I/O priority][io-priority] of the request
783 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
784 * @callback: (scope async): a #GAsyncReadyCallback
785 * to call when the request is satisfied
786 * @user_data: the data to pass to callback function
787 *
788 * Request an asynchronous read of @count bytes from the stream into the
789 * buffer starting at @buffer.
790 *
791 * This is the asynchronous equivalent of g_input_stream_read_all().
792 *
793 * Call g_input_stream_read_all_finish() to collect the result.
794 *
795 * Any outstanding I/O request with higher priority (lower numerical
796 * value) will be executed before an outstanding request with lower
797 * priority. Default priority is %G_PRIORITY_DEFAULT.
798 *
799 * Since: 2.44
800 **/
801 void
802 g_input_stream_read_all_async (GInputStream *stream,
803 void *buffer,
804 gsize count,
805 int io_priority,
806 GCancellable *cancellable,
807 GAsyncReadyCallback callback,
808 gpointer user_data)
809 {
810 AsyncReadAll *data;
811 GTask *task;
812
813 g_return_if_fail (G_IS_INPUT_STREAM (stream));
814 g_return_if_fail (buffer != NULL || count == 0);
815
816 task = g_task_new (stream, cancellable, callback, user_data);
817 data = g_slice_new0 (AsyncReadAll);
818 data->buffer = buffer;
819 data->to_read = count;
820
821 g_task_set_source_tag (task, g_input_stream_read_all_async);
822 g_task_set_task_data (task, data, free_async_read_all);
823 g_task_set_priority (task, io_priority);
824
825 /* If async reads are going to be handled via the threadpool anyway
826 * then we may as well do it with a single dispatch instead of
827 * bouncing in and out.
828 */
829 if (g_input_stream_async_read_is_via_threads (stream))
830 {
831 g_task_run_in_thread (task, read_all_async_thread);
832 g_object_unref (task);
833 }
834 else
835 read_all_callback (G_OBJECT (stream), NULL, task);
836 }
837
838 /**
839 * g_input_stream_read_all_finish:
840 * @stream: a #GInputStream
841 * @result: a #GAsyncResult
842 * @bytes_read: (out): location to store the number of bytes that was read from the stream
843 * @error: a #GError location to store the error occurring, or %NULL to ignore
844 *
845 * Finishes an asynchronous stream read operation started with
846 * g_input_stream_read_all_async().
847 *
848 * As a special exception to the normal conventions for functions that
849 * use #GError, if this function returns %FALSE (and sets @error) then
850 * @bytes_read will be set to the number of bytes that were successfully
851 * read before the error was encountered. This functionality is only
852 * available from C. If you need it from another language then you must
853 * write your own loop around g_input_stream_read_async().
854 *
855 * Returns: %TRUE on success, %FALSE if there was an error
856 *
857 * Since: 2.44
858 **/
859 gboolean
860 g_input_stream_read_all_finish (GInputStream *stream,
861 GAsyncResult *result,
862 gsize *bytes_read,
863 GError **error)
864 {
865 GTask *task;
866
867 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
868 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
869
870 task = G_TASK (result);
871
872 if (bytes_read)
873 {
874 AsyncReadAll *data = g_task_get_task_data (task);
875
876 *bytes_read = data->bytes_read;
877 }
878
879 return g_task_propagate_boolean (task, error);
880 }
881
882 static void
883 read_bytes_callback (GObject *stream,
884 GAsyncResult *result,
885 gpointer user_data)
886 {
887 GTask *task = user_data;
888 guchar *buf = g_task_get_task_data (task);
889 GError *error = NULL;
890 gssize nread;
891 GBytes *bytes = NULL;
892
893 nread = g_input_stream_read_finish (G_INPUT_STREAM (stream),
894 result, &error);
895 if (nread == -1)
896 {
897 g_free (buf);
898 g_task_return_error (task, error);
899 }
900 else if (nread == 0)
901 {
902 g_free (buf);
903 bytes = g_bytes_new_static ("", 0);
904 }
905 else
906 bytes = g_bytes_new_take (buf, nread);
907
908 if (bytes)
909 g_task_return_pointer (task, bytes, (GDestroyNotify)g_bytes_unref);
910
911 g_object_unref (task);
912 }
913
914 /**
915 * g_input_stream_read_bytes_async:
916 * @stream: A #GInputStream.
917 * @count: the number of bytes that will be read from the stream
918 * @io_priority: the [I/O priority][io-priority] of the request
919 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
920 * @callback: (scope async): a #GAsyncReadyCallback
921 * to call when the request is satisfied
922 * @user_data: the data to pass to callback function
923 *
924 * Request an asynchronous read of @count bytes from the stream into a
925 * new #GBytes. When the operation is finished @callback will be
926 * called. You can then call g_input_stream_read_bytes_finish() to get the
927 * result of the operation.
928 *
929 * During an async request no other sync and async calls are allowed
930 * on @stream, and will result in %G_IO_ERROR_PENDING errors.
931 *
932 * A value of @count larger than %G_MAXSSIZE will cause a
933 * %G_IO_ERROR_INVALID_ARGUMENT error.
934 *
935 * On success, the new #GBytes will be passed to the callback. It is
936 * not an error if this is smaller than the requested size, as it can
937 * happen e.g. near the end of a file, but generally we try to read as
938 * many bytes as requested. Zero is returned on end of file (or if
939 * @count is zero), but never otherwise.
940 *
941 * Any outstanding I/O request with higher priority (lower numerical
942 * value) will be executed before an outstanding request with lower
943 * priority. Default priority is %G_PRIORITY_DEFAULT.
944 *
945 * Since: 2.34
946 **/
947 void
948 g_input_stream_read_bytes_async (GInputStream *stream,
949 gsize count,
950 int io_priority,
951 GCancellable *cancellable,
952 GAsyncReadyCallback callback,
953 gpointer user_data)
954 {
955 GTask *task;
956 guchar *buf;
957
958 task = g_task_new (stream, cancellable, callback, user_data);
959 g_task_set_source_tag (task, g_input_stream_read_bytes_async);
960
961 buf = g_malloc (count);
962 g_task_set_task_data (task, buf, NULL);
963
964 g_input_stream_read_async (stream, buf, count,
965 io_priority, cancellable,
966 read_bytes_callback, task);
967 }
968
969 /**
970 * g_input_stream_read_bytes_finish:
971 * @stream: a #GInputStream.
972 * @result: a #GAsyncResult.
973 * @error: a #GError location to store the error occurring, or %NULL to
974 * ignore.
975 *
976 * Finishes an asynchronous stream read-into-#GBytes operation.
977 *
978 * Returns: (transfer full): the newly-allocated #GBytes, or %NULL on error
979 *
980 * Since: 2.34
981 **/
982 GBytes *
983 g_input_stream_read_bytes_finish (GInputStream *stream,
984 GAsyncResult *result,
985 GError **error)
986 {
987 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), NULL);
988 g_return_val_if_fail (g_task_is_valid (result, stream), NULL);
989
990 return g_task_propagate_pointer (G_TASK (result), error);
991 }
992
993 /**
994 * g_input_stream_skip_async:
995 * @stream: A #GInputStream.
996 * @count: the number of bytes that will be skipped from the stream
997 * @io_priority: the [I/O priority][io-priority] of the request
998 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
999 * @callback: (scope async): a #GAsyncReadyCallback
1000 * to call when the request is satisfied
1001 * @user_data: the data to pass to callback function
1002 *
1003 * Request an asynchronous skip of @count bytes from the stream.
1004 * When the operation is finished @callback will be called.
1005 * You can then call g_input_stream_skip_finish() to get the result
1006 * of the operation.
1007 *
1008 * During an async request no other sync and async calls are allowed,
1009 * and will result in %G_IO_ERROR_PENDING errors.
1010 *
1011 * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
1012 *
1013 * On success, the number of bytes skipped will be passed to the callback.
1014 * It is not an error if this is not the same as the requested size, as it
1015 * can happen e.g. near the end of a file, but generally we try to skip
1016 * as many bytes as requested. Zero is returned on end of file
1017 * (or if @count is zero), but never otherwise.
1018 *
1019 * Any outstanding i/o request with higher priority (lower numerical value)
1020 * will be executed before an outstanding request with lower priority.
1021 * Default priority is %G_PRIORITY_DEFAULT.
1022 *
1023 * The asynchronous methods have a default fallback that uses threads to
1024 * implement asynchronicity, so they are optional for inheriting classes.
1025 * However, if you override one, you must override all.
1026 **/
1027 void
1028 g_input_stream_skip_async (GInputStream *stream,
1029 gsize count,
1030 int io_priority,
1031 GCancellable *cancellable,
1032 GAsyncReadyCallback callback,
1033 gpointer user_data)
1034 {
1035 GInputStreamClass *class;
1036 GError *error = NULL;
1037
1038 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1039
1040 if (count == 0)
1041 {
1042 GTask *task;
1043
1044 task = g_task_new (stream, cancellable, callback, user_data);
1045 g_task_set_source_tag (task, g_input_stream_skip_async);
1046 g_task_return_int (task, 0);
1047 g_object_unref (task);
1048 return;
1049 }
1050
1051 if (((gssize) count) < 0)
1052 {
1053 g_task_report_new_error (stream, callback, user_data,
1054 g_input_stream_skip_async,
1055 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1056 _("Too large count value passed to %s"),
1057 G_STRFUNC);
1058 return;
1059 }
1060
1061 if (!g_input_stream_set_pending (stream, &error))
1062 {
1063 g_task_report_error (stream, callback, user_data,
1064 g_input_stream_skip_async,
1065 error);
1066 return;
1067 }
1068
1069 class = G_INPUT_STREAM_GET_CLASS (stream);
1070 stream->priv->outstanding_callback = callback;
1071 g_object_ref (stream);
1072 class->skip_async (stream, count, io_priority, cancellable,
1073 async_ready_callback_wrapper, user_data);
1074 }
1075
1076 /**
1077 * g_input_stream_skip_finish:
1078 * @stream: a #GInputStream.
1079 * @result: a #GAsyncResult.
1080 * @error: a #GError location to store the error occurring, or %NULL to
1081 * ignore.
1082 *
1083 * Finishes a stream skip operation.
1084 *
1085 * Returns: the size of the bytes skipped, or `-1` on error.
1086 **/
1087 gssize
1088 g_input_stream_skip_finish (GInputStream *stream,
1089 GAsyncResult *result,
1090 GError **error)
1091 {
1092 GInputStreamClass *class;
1093
1094 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), -1);
1095 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1096
1097 if (g_async_result_legacy_propagate_error (result, error))
1098 return -1;
1099 else if (g_async_result_is_tagged (result, g_input_stream_skip_async))
1100 return g_task_propagate_int (G_TASK (result), error);
1101
1102 class = G_INPUT_STREAM_GET_CLASS (stream);
1103 return class->skip_finish (stream, result, error);
1104 }
1105
1106 /**
1107 * g_input_stream_close_async:
1108 * @stream: A #GInputStream.
1109 * @io_priority: the [I/O priority][io-priority] of the request
1110 * @cancellable: (nullable): optional cancellable object
1111 * @callback: (scope async): a #GAsyncReadyCallback
1112 * to call when the request is satisfied
1113 * @user_data: the data to pass to callback function
1114 *
1115 * Requests an asynchronous closes of the stream, releasing resources related to it.
1116 * When the operation is finished @callback will be called.
1117 * You can then call g_input_stream_close_finish() to get the result of the
1118 * operation.
1119 *
1120 * For behaviour details see g_input_stream_close().
1121 *
1122 * The asynchronous methods have a default fallback that uses threads to implement
1123 * asynchronicity, so they are optional for inheriting classes. However, if you
1124 * override one you must override all.
1125 **/
1126 void
1127 g_input_stream_close_async (GInputStream *stream,
1128 int io_priority,
1129 GCancellable *cancellable,
1130 GAsyncReadyCallback callback,
1131 gpointer user_data)
1132 {
1133 GInputStreamClass *class;
1134 GError *error = NULL;
1135
1136 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1137
1138 if (stream->priv->closed)
1139 {
1140 GTask *task;
1141
1142 task = g_task_new (stream, cancellable, callback, user_data);
1143 g_task_set_source_tag (task, g_input_stream_close_async);
1144 g_task_return_boolean (task, TRUE);
1145 g_object_unref (task);
1146 return;
1147 }
1148
1149 if (!g_input_stream_set_pending (stream, &error))
1150 {
1151 g_task_report_error (stream, callback, user_data,
1152 g_input_stream_close_async,
1153 error);
1154 return;
1155 }
1156
1157 class = G_INPUT_STREAM_GET_CLASS (stream);
1158 stream->priv->outstanding_callback = callback;
1159 g_object_ref (stream);
1160 class->close_async (stream, io_priority, cancellable,
1161 async_ready_close_callback_wrapper, user_data);
1162 }
1163
1164 /**
1165 * g_input_stream_close_finish:
1166 * @stream: a #GInputStream.
1167 * @result: a #GAsyncResult.
1168 * @error: a #GError location to store the error occurring, or %NULL to
1169 * ignore.
1170 *
1171 * Finishes closing a stream asynchronously, started from g_input_stream_close_async().
1172 *
1173 * Returns: %TRUE if the stream was closed successfully.
1174 **/
1175 gboolean
1176 g_input_stream_close_finish (GInputStream *stream,
1177 GAsyncResult *result,
1178 GError **error)
1179 {
1180 GInputStreamClass *class;
1181
1182 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1183 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1184
1185 if (g_async_result_legacy_propagate_error (result, error))
1186 return FALSE;
1187 else if (g_async_result_is_tagged (result, g_input_stream_close_async))
1188 return g_task_propagate_boolean (G_TASK (result), error);
1189
1190 class = G_INPUT_STREAM_GET_CLASS (stream);
1191 return class->close_finish (stream, result, error);
1192 }
1193
1194 /**
1195 * g_input_stream_is_closed:
1196 * @stream: input stream.
1197 *
1198 * Checks if an input stream is closed.
1199 *
1200 * Returns: %TRUE if the stream is closed.
1201 **/
1202 gboolean
1203 g_input_stream_is_closed (GInputStream *stream)
1204 {
1205 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1206
1207 return stream->priv->closed;
1208 }
1209
1210 /**
1211 * g_input_stream_has_pending:
1212 * @stream: input stream.
1213 *
1214 * Checks if an input stream has pending actions.
1215 *
1216 * Returns: %TRUE if @stream has pending actions.
1217 **/
1218 gboolean
1219 g_input_stream_has_pending (GInputStream *stream)
1220 {
1221 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), TRUE);
1222
1223 return stream->priv->pending;
1224 }
1225
1226 /**
1227 * g_input_stream_set_pending:
1228 * @stream: input stream
1229 * @error: a #GError location to store the error occurring, or %NULL to
1230 * ignore.
1231 *
1232 * Sets @stream to have actions pending. If the pending flag is
1233 * already set or @stream is closed, it will return %FALSE and set
1234 * @error.
1235 *
1236 * Returns: %TRUE if pending was previously unset and is now set.
1237 **/
1238 gboolean
1239 g_input_stream_set_pending (GInputStream *stream, GError **error)
1240 {
1241 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1242
1243 if (stream->priv->closed)
1244 {
1245 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1246 _("Stream is already closed"));
1247 return FALSE;
1248 }
1249
1250 if (stream->priv->pending)
1251 {
1252 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1253 /* Translators: This is an error you get if there is already an
1254 * operation running against this stream when you try to start
1255 * one */
1256 _("Stream has outstanding operation"));
1257 return FALSE;
1258 }
1259
1260 stream->priv->pending = TRUE;
1261 return TRUE;
1262 }
1263
1264 /**
1265 * g_input_stream_clear_pending:
1266 * @stream: input stream
1267 *
1268 * Clears the pending flag on @stream.
1269 **/
1270 void
1271 g_input_stream_clear_pending (GInputStream *stream)
1272 {
1273 g_return_if_fail (G_IS_INPUT_STREAM (stream));
1274
1275 stream->priv->pending = FALSE;
1276 }
1277
1278 /*< internal >
1279 * g_input_stream_async_read_is_via_threads:
1280 * @stream: input stream
1281 *
1282 * Checks if an input stream's read_async function uses threads.
1283 *
1284 * Returns: %TRUE if @stream's read_async function uses threads.
1285 **/
1286 gboolean
1287 g_input_stream_async_read_is_via_threads (GInputStream *stream)
1288 {
1289 GInputStreamClass *class;
1290
1291 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1292
1293 class = G_INPUT_STREAM_GET_CLASS (stream);
1294
1295 return (class->read_async == g_input_stream_real_read_async &&
1296 !(G_IS_POLLABLE_INPUT_STREAM (stream) &&
1297 g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream))));
1298 }
1299
1300 /*< internal >
1301 * g_input_stream_async_close_is_via_threads:
1302 * @stream: input stream
1303 *
1304 * Checks if an input stream's close_async function uses threads.
1305 *
1306 * Returns: %TRUE if @stream's close_async function uses threads.
1307 **/
1308 gboolean
1309 g_input_stream_async_close_is_via_threads (GInputStream *stream)
1310 {
1311 GInputStreamClass *class;
1312
1313 g_return_val_if_fail (G_IS_INPUT_STREAM (stream), FALSE);
1314
1315 class = G_INPUT_STREAM_GET_CLASS (stream);
1316
1317 return class->close_async == g_input_stream_real_close_async;
1318 }
1319
1320 /********************************************
1321 * Default implementation of async ops *
1322 ********************************************/
1323
1324 typedef struct {
1325 void *buffer;
1326 gsize count;
1327 } ReadData;
1328
1329 static void
1330 free_read_data (ReadData *op)
1331 {
1332 g_slice_free (ReadData, op);
1333 }
1334
1335 static void
1336 read_async_thread (GTask *task,
1337 gpointer source_object,
1338 gpointer task_data,
1339 GCancellable *cancellable)
1340 {
1341 GInputStream *stream = source_object;
1342 ReadData *op = task_data;
1343 GInputStreamClass *class;
1344 GError *error = NULL;
1345 gssize nread;
1346
1347 class = G_INPUT_STREAM_GET_CLASS (stream);
1348
1349 nread = class->read_fn (stream,
1350 op->buffer, op->count,
1351 g_task_get_cancellable (task),
1352 &error);
1353 if (nread == -1)
1354 g_task_return_error (task, error);
1355 else
1356 g_task_return_int (task, nread);
1357 }
1358
1359 static void read_async_pollable (GPollableInputStream *stream,
1360 GTask *task);
1361
1362 static gboolean
1363 read_async_pollable_ready (GPollableInputStream *stream,
1364 gpointer user_data)
1365 {
1366 GTask *task = user_data;
1367
1368 read_async_pollable (stream, task);
1369 return FALSE;
1370 }
1371
1372 static void
1373 read_async_pollable (GPollableInputStream *stream,
1374 GTask *task)
1375 {
1376 ReadData *op = g_task_get_task_data (task);
1377 GError *error = NULL;
1378 gssize nread;
1379
1380 if (g_task_return_error_if_cancelled (task))
1381 return;
1382
1383 nread = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
1384 read_nonblocking (stream, op->buffer, op->count, &error);
1385
1386 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1387 {
1388 GSource *source;
1389
1390 g_error_free (error);
1391
1392 source = g_pollable_input_stream_create_source (stream,
1393 g_task_get_cancellable (task));
1394 g_task_attach_source (task, source,
1395 (GSourceFunc) read_async_pollable_ready);
1396 g_source_unref (source);
1397 return;
1398 }
1399
1400 if (nread == -1)
1401 g_task_return_error (task, error);
1402 else
1403 g_task_return_int (task, nread);
1404 /* g_input_stream_real_read_async() unrefs task */
1405 }
1406
1407
1408 static void
1409 g_input_stream_real_read_async (GInputStream *stream,
1410 void *buffer,
1411 gsize count,
1412 int io_priority,
1413 GCancellable *cancellable,
1414 GAsyncReadyCallback callback,
1415 gpointer user_data)
1416 {
1417 GTask *task;
1418 ReadData *op;
1419
1420 op = g_slice_new0 (ReadData);
1421 task = g_task_new (stream, cancellable, callback, user_data);
1422 g_task_set_source_tag (task, g_input_stream_real_read_async);
1423 g_task_set_task_data (task, op, (GDestroyNotify) free_read_data);
1424 g_task_set_priority (task, io_priority);
1425 op->buffer = buffer;
1426 op->count = count;
1427
1428 if (!g_input_stream_async_read_is_via_threads (stream))
1429 read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), task);
1430 else
1431 g_task_run_in_thread (task, read_async_thread);
1432 g_object_unref (task);
1433 }
1434
1435 static gssize
1436 g_input_stream_real_read_finish (GInputStream *stream,
1437 GAsyncResult *result,
1438 GError **error)
1439 {
1440 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1441
1442 return g_task_propagate_int (G_TASK (result), error);
1443 }
1444
1445
1446 static void
1447 skip_async_thread (GTask *task,
1448 gpointer source_object,
1449 gpointer task_data,
1450 GCancellable *cancellable)
1451 {
1452 GInputStream *stream = source_object;
1453 gsize count = GPOINTER_TO_SIZE (task_data);
1454 GInputStreamClass *class;
1455 GError *error = NULL;
1456 gssize ret;
1457
1458 class = G_INPUT_STREAM_GET_CLASS (stream);
1459 ret = class->skip (stream, count,
1460 g_task_get_cancellable (task),
1461 &error);
1462 if (ret == -1)
1463 g_task_return_error (task, error);
1464 else
1465 g_task_return_int (task, ret);
1466 }
1467
1468 typedef struct {
1469 char buffer[8192];
1470 gsize count;
1471 gsize count_skipped;
1472 } SkipFallbackAsyncData;
1473
1474 static void
1475 skip_callback_wrapper (GObject *source_object,
1476 GAsyncResult *res,
1477 gpointer user_data)
1478 {
1479 GInputStreamClass *class;
1480 GTask *task = user_data;
1481 SkipFallbackAsyncData *data = g_task_get_task_data (task);
1482 GError *error = NULL;
1483 gssize ret;
1484
1485 ret = g_input_stream_read_finish (G_INPUT_STREAM (source_object), res, &error);
1486
1487 if (ret > 0)
1488 {
1489 data->count -= ret;
1490 data->count_skipped += ret;
1491
1492 if (data->count > 0)
1493 {
1494 class = G_INPUT_STREAM_GET_CLASS (source_object);
1495 class->read_async (G_INPUT_STREAM (source_object),
1496 data->buffer, MIN (8192, data->count),
1497 g_task_get_priority (task),
1498 g_task_get_cancellable (task),
1499 skip_callback_wrapper, task);
1500 return;
1501 }
1502 }
1503
1504 if (ret == -1 &&
1505 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
1506 data->count_skipped)
1507 {
1508 /* No error, return partial read */
1509 g_clear_error (&error);
1510 }
1511
1512 if (error)
1513 g_task_return_error (task, error);
1514 else
1515 g_task_return_int (task, data->count_skipped);
1516 g_object_unref (task);
1517 }
1518
1519 static void
1520 g_input_stream_real_skip_async (GInputStream *stream,
1521 gsize count,
1522 int io_priority,
1523 GCancellable *cancellable,
1524 GAsyncReadyCallback callback,
1525 gpointer user_data)
1526 {
1527 GInputStreamClass *class;
1528 SkipFallbackAsyncData *data;
1529 GTask *task;
1530
1531 class = G_INPUT_STREAM_GET_CLASS (stream);
1532
1533 task = g_task_new (stream, cancellable, callback, user_data);
1534 g_task_set_source_tag (task, g_input_stream_real_skip_async);
1535 g_task_set_priority (task, io_priority);
1536
1537 if (g_input_stream_async_read_is_via_threads (stream))
1538 {
1539 /* Read is thread-using async fallback.
1540 * Make skip use threads too, so that we can use a possible sync skip
1541 * implementation. */
1542 g_task_set_task_data (task, GSIZE_TO_POINTER (count), NULL);
1543
1544 g_task_run_in_thread (task, skip_async_thread);
1545 g_object_unref (task);
1546 }
1547 else
1548 {
1549 /* TODO: Skip fallback uses too much memory, should do multiple read calls */
1550
1551 /* There is a custom async read function, lets use that. */
1552 data = g_new (SkipFallbackAsyncData, 1);
1553 data->count = count;
1554 data->count_skipped = 0;
1555 g_task_set_task_data (task, data, g_free);
1556 g_task_set_check_cancellable (task, FALSE);
1557 class->read_async (stream, data->buffer, MIN (8192, count), io_priority, cancellable,
1558 skip_callback_wrapper, task);
1559 }
1560
1561 }
1562
1563 static gssize
1564 g_input_stream_real_skip_finish (GInputStream *stream,
1565 GAsyncResult *result,
1566 GError **error)
1567 {
1568 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1569
1570 return g_task_propagate_int (G_TASK (result), error);
1571 }
1572
1573 static void
1574 close_async_thread (GTask *task,
1575 gpointer source_object,
1576 gpointer task_data,
1577 GCancellable *cancellable)
1578 {
1579 GInputStream *stream = source_object;
1580 GInputStreamClass *class;
1581 GError *error = NULL;
1582 gboolean result;
1583
1584 class = G_INPUT_STREAM_GET_CLASS (stream);
1585 if (class->close_fn)
1586 {
1587 result = class->close_fn (stream,
1588 g_task_get_cancellable (task),
1589 &error);
1590 if (!result)
1591 {
1592 g_task_return_error (task, error);
1593 return;
1594 }
1595 }
1596
1597 g_task_return_boolean (task, TRUE);
1598 }
1599
1600 static void
1601 g_input_stream_real_close_async (GInputStream *stream,
1602 int io_priority,
1603 GCancellable *cancellable,
1604 GAsyncReadyCallback callback,
1605 gpointer user_data)
1606 {
1607 GTask *task;
1608
1609 task = g_task_new (stream, cancellable, callback, user_data);
1610 g_task_set_source_tag (task, g_input_stream_real_close_async);
1611 g_task_set_check_cancellable (task, FALSE);
1612 g_task_set_priority (task, io_priority);
1613
1614 g_task_run_in_thread (task, close_async_thread);
1615 g_object_unref (task);
1616 }
1617
1618 static gboolean
1619 g_input_stream_real_close_finish (GInputStream *stream,
1620 GAsyncResult *result,
1621 GError **error)
1622 {
1623 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1624
1625 return g_task_propagate_boolean (G_TASK (result), error);
1626 }