1 /* GIO - GLib Input, Output and Streaming Library
2 *
3 * Copyright (C) 2006-2007 Red Hat, Inc.
4 * Copyright (C) 2007 Jürg Billeter
5 *
6 * SPDX-License-Identifier: LGPL-2.1-or-later
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General
19 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
20 *
21 * Author: Christian Kellner <gicmo@gnome.org>
22 */
23
24 #include "config.h"
25 #include "gbufferedinputstream.h"
26 #include "ginputstream.h"
27 #include "gcancellable.h"
28 #include "gasyncresult.h"
29 #include "gtask.h"
30 #include "gseekable.h"
31 #include "gioerror.h"
32 #include <string.h>
33 #include "glibintl.h"
34
35
36 /**
37 * GBufferedInputStream:
38 *
39 * Buffered input stream implements #GFilterInputStream and provides
40 * for buffered reads.
41 *
42 * By default, `GBufferedInputStream`'s buffer size is set at 4 kilobytes.
43 *
44 * To create a buffered input stream, use [ctor@Gio.BufferedInputStream.new],
45 * or [ctor@Gio.BufferedInputStream.new_sized] to specify the buffer's size at
46 * construction.
47 *
48 * To get the size of a buffer within a buffered input stream, use
49 * [method@Gio.BufferedInputStream.get_buffer_size]. To change the size of a
50 * buffered input stream's buffer, use [method@Gio.BufferedInputStream.set_buffer_size].
51 * Note that the buffer's size cannot be reduced below the size of the data within the buffer.
52 */
53
54
55 #define DEFAULT_BUFFER_SIZE 4096
56
57 struct _GBufferedInputStreamPrivate {
58 guint8 *buffer;
59 gsize len;
60 gsize pos;
61 gsize end;
62 GAsyncReadyCallback outstanding_callback;
63 };
64
65 enum {
66 PROP_0,
67 PROP_BUFSIZE
68 };
69
70 static void g_buffered_input_stream_set_property (GObject *object,
71 guint prop_id,
72 const GValue *value,
73 GParamSpec *pspec);
74
75 static void g_buffered_input_stream_get_property (GObject *object,
76 guint prop_id,
77 GValue *value,
78 GParamSpec *pspec);
79 static void g_buffered_input_stream_finalize (GObject *object);
80
81
82 static gssize g_buffered_input_stream_skip (GInputStream *stream,
83 gsize count,
84 GCancellable *cancellable,
85 GError **error);
86 static void g_buffered_input_stream_skip_async (GInputStream *stream,
87 gsize count,
88 int io_priority,
89 GCancellable *cancellable,
90 GAsyncReadyCallback callback,
91 gpointer user_data);
92 static gssize g_buffered_input_stream_skip_finish (GInputStream *stream,
93 GAsyncResult *result,
94 GError **error);
95 static gssize g_buffered_input_stream_read (GInputStream *stream,
96 void *buffer,
97 gsize count,
98 GCancellable *cancellable,
99 GError **error);
100 static gssize g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
101 gssize count,
102 GCancellable *cancellable,
103 GError **error);
104 static void g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
105 gssize count,
106 int io_priority,
107 GCancellable *cancellable,
108 GAsyncReadyCallback callback,
109 gpointer user_data);
110 static gssize g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
111 GAsyncResult *result,
112 GError **error);
113
114 static void g_buffered_input_stream_seekable_iface_init (GSeekableIface *iface);
115 static goffset g_buffered_input_stream_tell (GSeekable *seekable);
116 static gboolean g_buffered_input_stream_can_seek (GSeekable *seekable);
117 static gboolean g_buffered_input_stream_seek (GSeekable *seekable,
118 goffset offset,
119 GSeekType type,
120 GCancellable *cancellable,
121 GError **error);
122 static gboolean g_buffered_input_stream_can_truncate (GSeekable *seekable);
123 static gboolean g_buffered_input_stream_truncate (GSeekable *seekable,
124 goffset offset,
125 GCancellable *cancellable,
126 GError **error);
127
128 static void compact_buffer (GBufferedInputStream *stream);
129
130 G_DEFINE_TYPE_WITH_CODE (GBufferedInputStream,
131 g_buffered_input_stream,
132 G_TYPE_FILTER_INPUT_STREAM,
133 G_ADD_PRIVATE (GBufferedInputStream)
134 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
135 g_buffered_input_stream_seekable_iface_init))
136
137 static void
138 g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
139 {
140 GObjectClass *object_class;
141 GInputStreamClass *istream_class;
142 GBufferedInputStreamClass *bstream_class;
143
144 object_class = G_OBJECT_CLASS (klass);
145 object_class->get_property = g_buffered_input_stream_get_property;
146 object_class->set_property = g_buffered_input_stream_set_property;
147 object_class->finalize = g_buffered_input_stream_finalize;
148
149 istream_class = G_INPUT_STREAM_CLASS (klass);
150 istream_class->skip = g_buffered_input_stream_skip;
151 istream_class->skip_async = g_buffered_input_stream_skip_async;
152 istream_class->skip_finish = g_buffered_input_stream_skip_finish;
153 istream_class->read_fn = g_buffered_input_stream_read;
154
155 bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
156 bstream_class->fill = g_buffered_input_stream_real_fill;
157 bstream_class->fill_async = g_buffered_input_stream_real_fill_async;
158 bstream_class->fill_finish = g_buffered_input_stream_real_fill_finish;
159
160 /**
161 * GBufferedInputStream:buffer-size:
162 *
163 * The size of the backend buffer, in bytes.
164 */
165 g_object_class_install_property (object_class,
166 PROP_BUFSIZE,
167 g_param_spec_uint ("buffer-size", NULL, NULL,
168 1,
169 G_MAXUINT,
170 DEFAULT_BUFFER_SIZE,
171 G_PARAM_READWRITE | G_PARAM_CONSTRUCT |
172 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
173
174
175 }
176
177 /**
178 * g_buffered_input_stream_get_buffer_size:
179 * @stream: a #GBufferedInputStream
180 *
181 * Gets the size of the input buffer.
182 *
183 * Returns: the current buffer size.
184 */
185 gsize
186 g_buffered_input_stream_get_buffer_size (GBufferedInputStream *stream)
187 {
188 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), 0);
189
190 return stream->priv->len;
191 }
192
193 /**
194 * g_buffered_input_stream_set_buffer_size:
195 * @stream: a #GBufferedInputStream
196 * @size: a #gsize
197 *
198 * Sets the size of the internal buffer of @stream to @size, or to the
199 * size of the contents of the buffer. The buffer can never be resized
200 * smaller than its current contents.
201 */
202 void
203 g_buffered_input_stream_set_buffer_size (GBufferedInputStream *stream,
204 gsize size)
205 {
206 GBufferedInputStreamPrivate *priv;
207 gsize in_buffer;
208 guint8 *buffer;
209
210 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
211
212 priv = stream->priv;
213
214 if (priv->len == size)
215 return;
216
217 if (priv->buffer)
218 {
219 in_buffer = priv->end - priv->pos;
220
221 /* Never resize smaller than current buffer contents */
222 size = MAX (size, in_buffer);
223
224 buffer = g_malloc (size);
225 memcpy (buffer, priv->buffer + priv->pos, in_buffer);
226 priv->len = size;
227 priv->pos = 0;
228 priv->end = in_buffer;
229 g_free (priv->buffer);
230 priv->buffer = buffer;
231 }
232 else
233 {
234 priv->len = size;
235 priv->pos = 0;
236 priv->end = 0;
237 priv->buffer = g_malloc (size);
238 }
239
240 g_object_notify (G_OBJECT (stream), "buffer-size");
241 }
242
243 static void
244 g_buffered_input_stream_set_property (GObject *object,
245 guint prop_id,
246 const GValue *value,
247 GParamSpec *pspec)
248 {
249 GBufferedInputStream *bstream;
250
251 bstream = G_BUFFERED_INPUT_STREAM (object);
252
253 switch (prop_id)
254 {
255 case PROP_BUFSIZE:
256 g_buffered_input_stream_set_buffer_size (bstream, g_value_get_uint (value));
257 break;
258
259 default:
260 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
261 break;
262 }
263 }
264
265 static void
266 g_buffered_input_stream_get_property (GObject *object,
267 guint prop_id,
268 GValue *value,
269 GParamSpec *pspec)
270 {
271 GBufferedInputStreamPrivate *priv;
272 GBufferedInputStream *bstream;
273
274 bstream = G_BUFFERED_INPUT_STREAM (object);
275 priv = bstream->priv;
276
277 switch (prop_id)
278 {
279 case PROP_BUFSIZE:
280 g_value_set_uint (value, priv->len);
281 break;
282
283 default:
284 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
285 break;
286 }
287 }
288
289 static void
290 g_buffered_input_stream_finalize (GObject *object)
291 {
292 GBufferedInputStreamPrivate *priv;
293 GBufferedInputStream *stream;
294
295 stream = G_BUFFERED_INPUT_STREAM (object);
296 priv = stream->priv;
297
298 g_free (priv->buffer);
299
300 G_OBJECT_CLASS (g_buffered_input_stream_parent_class)->finalize (object);
301 }
302
303 static void
304 g_buffered_input_stream_seekable_iface_init (GSeekableIface *iface)
305 {
306 iface->tell = g_buffered_input_stream_tell;
307 iface->can_seek = g_buffered_input_stream_can_seek;
308 iface->seek = g_buffered_input_stream_seek;
309 iface->can_truncate = g_buffered_input_stream_can_truncate;
310 iface->truncate_fn = g_buffered_input_stream_truncate;
311 }
312
313 static void
314 g_buffered_input_stream_init (GBufferedInputStream *stream)
315 {
316 stream->priv = g_buffered_input_stream_get_instance_private (stream);
317 }
318
319
320 /**
321 * g_buffered_input_stream_new:
322 * @base_stream: a #GInputStream
323 *
324 * Creates a new #GInputStream from the given @base_stream, with
325 * a buffer set to the default size (4 kilobytes).
326 *
327 * Returns: a #GInputStream for the given @base_stream.
328 */
329 GInputStream *
330 g_buffered_input_stream_new (GInputStream *base_stream)
331 {
332 GInputStream *stream;
333
334 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
335
336 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
337 "base-stream", base_stream,
338 NULL);
339
340 return stream;
341 }
342
343 /**
344 * g_buffered_input_stream_new_sized:
345 * @base_stream: a #GInputStream
346 * @size: a #gsize
347 *
348 * Creates a new #GBufferedInputStream from the given @base_stream,
349 * with a buffer set to @size.
350 *
351 * Returns: a #GInputStream.
352 */
353 GInputStream *
354 g_buffered_input_stream_new_sized (GInputStream *base_stream,
355 gsize size)
356 {
357 GInputStream *stream;
358
359 g_return_val_if_fail (G_IS_INPUT_STREAM (base_stream), NULL);
360
361 stream = g_object_new (G_TYPE_BUFFERED_INPUT_STREAM,
362 "base-stream", base_stream,
363 "buffer-size", (guint)size,
364 NULL);
365
366 return stream;
367 }
368
369 /**
370 * g_buffered_input_stream_fill:
371 * @stream: a #GBufferedInputStream
372 * @count: the number of bytes that will be read from the stream
373 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
374 * @error: location to store the error occurring, or %NULL to ignore
375 *
376 * Tries to read @count bytes from the stream into the buffer.
377 * Will block during this read.
378 *
379 * If @count is zero, returns zero and does nothing. A value of @count
380 * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
381 *
382 * On success, the number of bytes read into the buffer is returned.
383 * It is not an error if this is not the same as the requested size, as it
384 * can happen e.g. near the end of a file. Zero is returned on end of file
385 * (or if @count is zero), but never otherwise.
386 *
387 * If @count is -1 then the attempted read size is equal to the number of
388 * bytes that are required to fill the buffer.
389 *
390 * If @cancellable is not %NULL, then the operation can be cancelled by
391 * triggering the cancellable object from another thread. If the operation
392 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
393 * operation was partially finished when the operation was cancelled the
394 * partial result will be returned, without an error.
395 *
396 * On error -1 is returned and @error is set accordingly.
397 *
398 * For the asynchronous, non-blocking, version of this function, see
399 * g_buffered_input_stream_fill_async().
400 *
401 * Returns: the number of bytes read into @stream's buffer, up to @count,
402 * or -1 on error.
403 */
404 gssize
405 g_buffered_input_stream_fill (GBufferedInputStream *stream,
406 gssize count,
407 GCancellable *cancellable,
408 GError **error)
409 {
410 GBufferedInputStreamClass *class;
411 GInputStream *input_stream;
412 gssize res;
413
414 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
415
416 input_stream = G_INPUT_STREAM (stream);
417
418 if (count < -1)
419 {
420 g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
421 _("Too large count value passed to %s"), G_STRFUNC);
422 return -1;
423 }
424
425 if (!g_input_stream_set_pending (input_stream, error))
426 return -1;
427
428 if (cancellable)
429 g_cancellable_push_current (cancellable);
430
431 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
432 res = class->fill (stream, count, cancellable, error);
433
434 if (cancellable)
435 g_cancellable_pop_current (cancellable);
436
437 g_input_stream_clear_pending (input_stream);
438
439 return res;
440 }
441
442 static void
443 async_fill_callback_wrapper (GObject *source_object,
444 GAsyncResult *res,
445 gpointer user_data)
446 {
447 GBufferedInputStream *stream = G_BUFFERED_INPUT_STREAM (source_object);
448
449 g_input_stream_clear_pending (G_INPUT_STREAM (stream));
450 (*stream->priv->outstanding_callback) (source_object, res, user_data);
451 g_object_unref (stream);
452 }
453
454 /**
455 * g_buffered_input_stream_fill_async:
456 * @stream: a #GBufferedInputStream
457 * @count: the number of bytes that will be read from the stream
458 * @io_priority: the [I/O priority][io-priority] of the request
459 * @cancellable: (nullable): optional #GCancellable object
460 * @callback: (scope async) (closure user_data): a #GAsyncReadyCallback
461 * @user_data: a #gpointer
462 *
463 * Reads data into @stream's buffer asynchronously, up to @count size.
464 * @io_priority can be used to prioritize reads. For the synchronous
465 * version of this function, see g_buffered_input_stream_fill().
466 *
467 * If @count is -1 then the attempted read size is equal to the number
468 * of bytes that are required to fill the buffer.
469 */
470 void
471 g_buffered_input_stream_fill_async (GBufferedInputStream *stream,
472 gssize count,
473 int io_priority,
474 GCancellable *cancellable,
475 GAsyncReadyCallback callback,
476 gpointer user_data)
477 {
478 GBufferedInputStreamClass *class;
479 GError *error = NULL;
480
481 g_return_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream));
482
483 if (count == 0)
484 {
485 GTask *task;
486
487 task = g_task_new (stream, cancellable, callback, user_data);
488 g_task_set_source_tag (task, g_buffered_input_stream_fill_async);
489 g_task_return_int (task, 0);
490 g_object_unref (task);
491 return;
492 }
493
494 if (count < -1)
495 {
496 g_task_report_new_error (stream, callback, user_data,
497 g_buffered_input_stream_fill_async,
498 G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
499 _("Too large count value passed to %s"),
500 G_STRFUNC);
501 return;
502 }
503
504 if (!g_input_stream_set_pending (G_INPUT_STREAM (stream), &error))
505 {
506 g_task_report_error (stream, callback, user_data,
507 g_buffered_input_stream_fill_async,
508 error);
509 return;
510 }
511
512 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
513
514 stream->priv->outstanding_callback = callback;
515 g_object_ref (stream);
516 class->fill_async (stream, count, io_priority, cancellable,
517 async_fill_callback_wrapper, user_data);
518 }
519
520 /**
521 * g_buffered_input_stream_fill_finish:
522 * @stream: a #GBufferedInputStream
523 * @result: a #GAsyncResult
524 * @error: a #GError
525 *
526 * Finishes an asynchronous read.
527 *
528 * Returns: a #gssize of the read stream, or `-1` on an error.
529 */
530 gssize
531 g_buffered_input_stream_fill_finish (GBufferedInputStream *stream,
532 GAsyncResult *result,
533 GError **error)
534 {
535 GBufferedInputStreamClass *class;
536
537 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
538 g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
539
540 if (g_async_result_legacy_propagate_error (result, error))
541 return -1;
542 else if (g_async_result_is_tagged (result, g_buffered_input_stream_fill_async))
543 return g_task_propagate_int (G_TASK (result), error);
544
545 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
546 return class->fill_finish (stream, result, error);
547 }
548
549 /**
550 * g_buffered_input_stream_get_available:
551 * @stream: #GBufferedInputStream
552 *
553 * Gets the size of the available data within the stream.
554 *
555 * Returns: size of the available stream.
556 */
557 gsize
558 g_buffered_input_stream_get_available (GBufferedInputStream *stream)
559 {
560 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
561
562 return stream->priv->end - stream->priv->pos;
563 }
564
565 /**
566 * g_buffered_input_stream_peek:
567 * @stream: a #GBufferedInputStream
568 * @buffer: (array length=count) (element-type guint8): a pointer to
569 * an allocated chunk of memory
570 * @offset: a #gsize
571 * @count: a #gsize
572 *
573 * Peeks in the buffer, copying data of size @count into @buffer,
574 * offset @offset bytes.
575 *
576 * Returns: a #gsize of the number of bytes peeked, or -1 on error.
577 */
578 gsize
579 g_buffered_input_stream_peek (GBufferedInputStream *stream,
580 void *buffer,
581 gsize offset,
582 gsize count)
583 {
584 gsize available;
585 gsize end;
586
587 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
588 g_return_val_if_fail (buffer != NULL, -1);
589
590 available = g_buffered_input_stream_get_available (stream);
591
592 if (offset > available)
593 return 0;
594
595 end = MIN (offset + count, available);
596 count = end - offset;
597
598 memcpy (buffer, stream->priv->buffer + stream->priv->pos + offset, count);
599 return count;
600 }
601
602 /**
603 * g_buffered_input_stream_peek_buffer:
604 * @stream: a #GBufferedInputStream
605 * @count: (out): a #gsize to get the number of bytes available in the buffer
606 *
607 * Returns the buffer with the currently available bytes. The returned
608 * buffer must not be modified and will become invalid when reading from
609 * the stream or filling the buffer.
610 *
611 * Returns: (array length=count) (element-type guint8) (transfer none):
612 * read-only buffer
613 */
614 const void*
615 g_buffered_input_stream_peek_buffer (GBufferedInputStream *stream,
616 gsize *count)
617 {
618 GBufferedInputStreamPrivate *priv;
619
620 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), NULL);
621
622 priv = stream->priv;
623
624 if (count)
625 *count = priv->end - priv->pos;
626
627 return priv->buffer + priv->pos;
628 }
629
630 static void
631 compact_buffer (GBufferedInputStream *stream)
632 {
633 GBufferedInputStreamPrivate *priv;
634 gsize current_size;
635
636 priv = stream->priv;
637
638 current_size = priv->end - priv->pos;
639
640 memmove (priv->buffer, priv->buffer + priv->pos, current_size);
641
642 priv->pos = 0;
643 priv->end = current_size;
644 }
645
646 static gssize
647 g_buffered_input_stream_real_fill (GBufferedInputStream *stream,
648 gssize count,
649 GCancellable *cancellable,
650 GError **error)
651 {
652 GBufferedInputStreamPrivate *priv;
653 GInputStream *base_stream;
654 gssize nread;
655 gsize in_buffer;
656
657 priv = stream->priv;
658
659 if (count == -1)
660 count = priv->len;
661
662 in_buffer = priv->end - priv->pos;
663
664 /* Never fill more than can fit in the buffer */
665 count = MIN ((gsize) count, priv->len - in_buffer);
666
667 /* If requested length does not fit at end, compact */
668 if (priv->len - priv->end < (gsize) count)
669 compact_buffer (stream);
670
671 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
672 nread = g_input_stream_read (base_stream,
673 priv->buffer + priv->end,
674 count,
675 cancellable,
676 error);
677
678 if (nread > 0)
679 priv->end += nread;
680
681 return nread;
682 }
683
684 static gssize
685 g_buffered_input_stream_skip (GInputStream *stream,
686 gsize count,
687 GCancellable *cancellable,
688 GError **error)
689 {
690 GBufferedInputStream *bstream;
691 GBufferedInputStreamPrivate *priv;
692 GBufferedInputStreamClass *class;
693 GInputStream *base_stream;
694 gsize available, bytes_skipped;
695 gssize nread;
696
697 bstream = G_BUFFERED_INPUT_STREAM (stream);
698 priv = bstream->priv;
699
700 available = priv->end - priv->pos;
701
702 if (count <= available)
703 {
704 priv->pos += count;
705 return count;
706 }
707
708 /* Full request not available, skip all currently available and
709 * request refill for more
710 */
711
712 priv->pos = 0;
713 priv->end = 0;
714 bytes_skipped = available;
715 count -= available;
716
717 if (bytes_skipped > 0)
718 error = NULL; /* Ignore further errors if we already read some data */
719
720 if (count > priv->len)
721 {
722 /* Large request, shortcut buffer */
723
724 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
725
726 nread = g_input_stream_skip (base_stream,
727 count,
728 cancellable,
729 error);
730
731 if (nread < 0 && bytes_skipped == 0)
732 return -1;
733
734 if (nread > 0)
735 bytes_skipped += nread;
736
737 return bytes_skipped;
738 }
739
740 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
741 nread = class->fill (bstream, priv->len, cancellable, error);
742
743 if (nread < 0)
744 {
745 if (bytes_skipped == 0)
746 return -1;
747 else
748 return bytes_skipped;
749 }
750
751 available = priv->end - priv->pos;
752 count = MIN (count, available);
753
754 bytes_skipped += count;
755 priv->pos += count;
756
757 return bytes_skipped;
758 }
759
760 static gssize
761 g_buffered_input_stream_read (GInputStream *stream,
762 void *buffer,
763 gsize count,
764 GCancellable *cancellable,
765 GError **error)
766 {
767 GBufferedInputStream *bstream;
768 GBufferedInputStreamPrivate *priv;
769 GBufferedInputStreamClass *class;
770 GInputStream *base_stream;
771 gsize available, bytes_read;
772 gssize nread;
773
774 bstream = G_BUFFERED_INPUT_STREAM (stream);
775 priv = bstream->priv;
776
777 available = priv->end - priv->pos;
778
779 if (count <= available)
780 {
781 memcpy (buffer, priv->buffer + priv->pos, count);
782 priv->pos += count;
783 return count;
784 }
785
786 /* Full request not available, read all currently available and
787 * request refill for more
788 */
789
790 memcpy (buffer, priv->buffer + priv->pos, available);
791 priv->pos = 0;
792 priv->end = 0;
793 bytes_read = available;
794 count -= available;
795
796 if (bytes_read > 0)
797 error = NULL; /* Ignore further errors if we already read some data */
798
799 if (count > priv->len)
800 {
801 /* Large request, shortcut buffer */
802
803 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
804
805 nread = g_input_stream_read (base_stream,
806 (char *)buffer + bytes_read,
807 count,
808 cancellable,
809 error);
810
811 if (nread < 0 && bytes_read == 0)
812 return -1;
813
814 if (nread > 0)
815 bytes_read += nread;
816
817 return bytes_read;
818 }
819
820 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
821 nread = class->fill (bstream, priv->len, cancellable, error);
822 if (nread < 0)
823 {
824 if (bytes_read == 0)
825 return -1;
826 else
827 return bytes_read;
828 }
829
830 available = priv->end - priv->pos;
831 count = MIN (count, available);
832
833 memcpy ((char *)buffer + bytes_read, (char *)priv->buffer + priv->pos, count);
834 bytes_read += count;
835 priv->pos += count;
836
837 return bytes_read;
838 }
839
840 static goffset
841 g_buffered_input_stream_tell (GSeekable *seekable)
842 {
843 GBufferedInputStream *bstream;
844 GBufferedInputStreamPrivate *priv;
845 GInputStream *base_stream;
846 GSeekable *base_stream_seekable;
847 gsize available;
848 goffset base_offset;
849
850 bstream = G_BUFFERED_INPUT_STREAM (seekable);
851 priv = bstream->priv;
852
853 base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
854 if (!G_IS_SEEKABLE (base_stream))
855 return 0;
856 base_stream_seekable = G_SEEKABLE (base_stream);
857
858 available = priv->end - priv->pos;
859 base_offset = g_seekable_tell (base_stream_seekable);
860
861 return base_offset - available;
862 }
863
864 static gboolean
865 g_buffered_input_stream_can_seek (GSeekable *seekable)
866 {
867 GInputStream *base_stream;
868
869 base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
870 return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
871 }
872
873 static gboolean
874 g_buffered_input_stream_seek (GSeekable *seekable,
875 goffset offset,
876 GSeekType type,
877 GCancellable *cancellable,
878 GError **error)
879 {
880 GBufferedInputStream *bstream;
881 GBufferedInputStreamPrivate *priv;
882 GInputStream *base_stream;
883 GSeekable *base_stream_seekable;
884
885 bstream = G_BUFFERED_INPUT_STREAM (seekable);
886 priv = bstream->priv;
887
888 base_stream = G_FILTER_INPUT_STREAM (seekable)->base_stream;
889 if (!G_IS_SEEKABLE (base_stream))
890 {
891 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
892 _("Seek not supported on base stream"));
893 return FALSE;
894 }
895
896 base_stream_seekable = G_SEEKABLE (base_stream);
897
898 if (type == G_SEEK_CUR)
899 {
900 if (offset <= (goffset) (priv->end - priv->pos) &&
901 offset >= (goffset) -priv->pos)
902 {
903 priv->pos += offset;
904 return TRUE;
905 }
906 else
907 {
908 offset -= priv->end - priv->pos;
909 }
910 }
911
912 if (g_seekable_seek (base_stream_seekable, offset, type, cancellable, error))
913 {
914 priv->pos = 0;
915 priv->end = 0;
916 return TRUE;
917 }
918 else
919 {
920 return FALSE;
921 }
922 }
923
924 static gboolean
925 g_buffered_input_stream_can_truncate (GSeekable *seekable)
926 {
927 return FALSE;
928 }
929
930 static gboolean
931 g_buffered_input_stream_truncate (GSeekable *seekable,
932 goffset offset,
933 GCancellable *cancellable,
934 GError **error)
935 {
936 g_set_error_literal (error,
937 G_IO_ERROR,
938 G_IO_ERROR_NOT_SUPPORTED,
939 _("Cannot truncate GBufferedInputStream"));
940 return FALSE;
941 }
942
943 /**
944 * g_buffered_input_stream_read_byte:
945 * @stream: a #GBufferedInputStream
946 * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
947 * @error: location to store the error occurring, or %NULL to ignore
948 *
949 * Tries to read a single byte from the stream or the buffer. Will block
950 * during this read.
951 *
952 * On success, the byte read from the stream is returned. On end of stream
953 * -1 is returned but it's not an exceptional error and @error is not set.
954 *
955 * If @cancellable is not %NULL, then the operation can be cancelled by
956 * triggering the cancellable object from another thread. If the operation
957 * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
958 * operation was partially finished when the operation was cancelled the
959 * partial result will be returned, without an error.
960 *
961 * On error -1 is returned and @error is set accordingly.
962 *
963 * Returns: the byte read from the @stream, or -1 on end of stream or error.
964 */
965 int
966 g_buffered_input_stream_read_byte (GBufferedInputStream *stream,
967 GCancellable *cancellable,
968 GError **error)
969 {
970 GBufferedInputStreamPrivate *priv;
971 GBufferedInputStreamClass *class;
972 GInputStream *input_stream;
973 gsize available;
974 gssize nread;
975
976 g_return_val_if_fail (G_IS_BUFFERED_INPUT_STREAM (stream), -1);
977
978 priv = stream->priv;
979 input_stream = G_INPUT_STREAM (stream);
980
981 if (g_input_stream_is_closed (input_stream))
982 {
983 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
984 _("Stream is already closed"));
985 return -1;
986 }
987
988 if (!g_input_stream_set_pending (input_stream, error))
989 return -1;
990
991 available = priv->end - priv->pos;
992
993 if (available != 0)
994 {
995 g_input_stream_clear_pending (input_stream);
996 return priv->buffer[priv->pos++];
997 }
998
999 /* Byte not available, request refill for more */
1000
1001 if (cancellable)
1002 g_cancellable_push_current (cancellable);
1003
1004 priv->pos = 0;
1005 priv->end = 0;
1006
1007 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1008 nread = class->fill (stream, priv->len, cancellable, error);
1009
1010 if (cancellable)
1011 g_cancellable_pop_current (cancellable);
1012
1013 g_input_stream_clear_pending (input_stream);
1014
1015 if (nread <= 0)
1016 return -1; /* error or end of stream */
1017
1018 return priv->buffer[priv->pos++];
1019 }
1020
1021 /* ************************** */
1022 /* Async stuff implementation */
1023 /* ************************** */
1024
1025 static void
1026 fill_async_callback (GObject *source_object,
1027 GAsyncResult *result,
1028 gpointer user_data)
1029 {
1030 GError *error;
1031 gssize res;
1032 GTask *task = user_data;
1033
1034 error = NULL;
1035 res = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
1036 result, &error);
1037 if (res == -1)
1038 g_task_return_error (task, error);
1039 else
1040 {
1041 GBufferedInputStream *stream;
1042 GBufferedInputStreamPrivate *priv;
1043
1044 stream = g_task_get_source_object (task);
1045 priv = G_BUFFERED_INPUT_STREAM (stream)->priv;
1046
1047 g_assert_cmpint (priv->end + res, <=, priv->len);
1048 priv->end += res;
1049
1050 g_task_return_int (task, res);
1051 }
1052
1053 g_object_unref (task);
1054 }
1055
1056 static void
1057 g_buffered_input_stream_real_fill_async (GBufferedInputStream *stream,
1058 gssize count,
1059 int io_priority,
1060 GCancellable *cancellable,
1061 GAsyncReadyCallback callback,
1062 gpointer user_data)
1063 {
1064 GBufferedInputStreamPrivate *priv;
1065 GInputStream *base_stream;
1066 GTask *task;
1067 gsize in_buffer;
1068
1069 priv = stream->priv;
1070
1071 if (count == -1)
1072 count = priv->len;
1073
1074 in_buffer = priv->end - priv->pos;
1075
1076 /* Never fill more than can fit in the buffer */
1077 count = MIN ((gsize) count, priv->len - in_buffer);
1078
1079 /* If requested length does not fit at end, compact */
1080 if (priv->len - priv->end < (gsize) count)
1081 compact_buffer (stream);
1082
1083 task = g_task_new (stream, cancellable, callback, user_data);
1084 g_task_set_source_tag (task, g_buffered_input_stream_real_fill_async);
1085
1086 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1087 g_input_stream_read_async (base_stream,
1088 priv->buffer + priv->end,
1089 count,
1090 io_priority,
1091 cancellable,
1092 fill_async_callback,
1093 task);
1094 }
1095
1096 static gssize
1097 g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
1098 GAsyncResult *result,
1099 GError **error)
1100 {
1101 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1102
1103 return g_task_propagate_int (G_TASK (result), error);
1104 }
1105
1106 typedef struct
1107 {
1108 gsize bytes_skipped;
1109 gsize count;
1110 } SkipAsyncData;
1111
1112 static void
1113 free_skip_async_data (gpointer _data)
1114 {
1115 SkipAsyncData *data = _data;
1116 g_slice_free (SkipAsyncData, data);
1117 }
1118
1119 static void
1120 large_skip_callback (GObject *source_object,
1121 GAsyncResult *result,
1122 gpointer user_data)
1123 {
1124 GTask *task = G_TASK (user_data);
1125 SkipAsyncData *data;
1126 GError *error;
1127 gssize nread;
1128
1129 data = g_task_get_task_data (task);
1130
1131 error = NULL;
1132 nread = g_input_stream_skip_finish (G_INPUT_STREAM (source_object),
1133 result, &error);
1134
1135 /* Only report the error if we've not already read some data */
1136 if (nread < 0 && data->bytes_skipped == 0)
1137 g_task_return_error (task, error);
1138 else
1139 {
1140 if (error)
1141 g_error_free (error);
1142
1143 if (nread > 0)
1144 data->bytes_skipped += nread;
1145
1146 g_task_return_int (task, data->bytes_skipped);
1147 }
1148
1149 g_object_unref (task);
1150 }
1151
1152 static void
1153 skip_fill_buffer_callback (GObject *source_object,
1154 GAsyncResult *result,
1155 gpointer user_data)
1156 {
1157 GTask *task = G_TASK (user_data);
1158 GBufferedInputStream *bstream;
1159 GBufferedInputStreamPrivate *priv;
1160 SkipAsyncData *data;
1161 GError *error;
1162 gssize nread;
1163 gsize available;
1164
1165 bstream = G_BUFFERED_INPUT_STREAM (source_object);
1166 priv = bstream->priv;
1167
1168 data = g_task_get_task_data (task);
1169
1170 error = NULL;
1171 nread = g_buffered_input_stream_fill_finish (bstream,
1172 result, &error);
1173
1174 if (nread < 0 && data->bytes_skipped == 0)
1175 g_task_return_error (task, error);
1176 else
1177 {
1178 if (error)
1179 g_error_free (error);
1180
1181 if (nread > 0)
1182 {
1183 available = priv->end - priv->pos;
1184 data->count = MIN (data->count, available);
1185
1186 data->bytes_skipped += data->count;
1187 priv->pos += data->count;
1188 }
1189
1190 g_assert (data->bytes_skipped <= G_MAXSSIZE);
1191 g_task_return_int (task, data->bytes_skipped);
1192 }
1193
1194 g_object_unref (task);
1195 }
1196
1197 static void
1198 g_buffered_input_stream_skip_async (GInputStream *stream,
1199 gsize count,
1200 int io_priority,
1201 GCancellable *cancellable,
1202 GAsyncReadyCallback callback,
1203 gpointer user_data)
1204 {
1205 GBufferedInputStream *bstream;
1206 GBufferedInputStreamPrivate *priv;
1207 GBufferedInputStreamClass *class;
1208 GInputStream *base_stream;
1209 gsize available;
1210 GTask *task;
1211 SkipAsyncData *data;
1212
1213 bstream = G_BUFFERED_INPUT_STREAM (stream);
1214 priv = bstream->priv;
1215
1216 data = g_slice_new (SkipAsyncData);
1217 data->bytes_skipped = 0;
1218 task = g_task_new (stream, cancellable, callback, user_data);
1219 g_task_set_source_tag (task, g_buffered_input_stream_skip_async);
1220 g_task_set_task_data (task, data, free_skip_async_data);
1221
1222 available = priv->end - priv->pos;
1223
1224 if (count <= available)
1225 {
1226 priv->pos += count;
1227
1228 g_task_return_int (task, count);
1229 g_object_unref (task);
1230 return;
1231 }
1232
1233 /* Full request not available, skip all currently available
1234 * and request refill for more
1235 */
1236
1237 priv->pos = 0;
1238 priv->end = 0;
1239
1240 count -= available;
1241
1242 data->bytes_skipped = available;
1243 data->count = count;
1244
1245 if (count > priv->len)
1246 {
1247 /* Large request, shortcut buffer */
1248 base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
1249
1250 /* If 'count > G_MAXSSIZE then 'g_input_stream_skip_async()'
1251 * will return an error anyway before calling this.
1252 * Assert that this is never called for too big `count` for clarity. */
1253 g_assert ((gssize) count >= 0);
1254 g_input_stream_skip_async (base_stream,
1255 count,
1256 io_priority, cancellable,
1257 large_skip_callback,
1258 task);
1259 }
1260 else
1261 {
1262 class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
1263 class->fill_async (bstream, priv->len, io_priority, cancellable,
1264 skip_fill_buffer_callback, task);
1265 }
1266 }
1267
1268 static gssize
1269 g_buffered_input_stream_skip_finish (GInputStream *stream,
1270 GAsyncResult *result,
1271 GError **error)
1272 {
1273 g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1274
1275 return g_task_propagate_int (G_TASK (result), error);
1276 }