1 /* GLIB - Library of useful routines for C programming
2 * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
3 *
4 * GThreadPool: thread pool implementation.
5 * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
6 *
7 * SPDX-License-Identifier: LGPL-2.1-or-later
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 2.1 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /*
24 * MT safe
25 */
26
27 #include "config.h"
28
29 #include "gthreadpool.h"
30
31 #include "gasyncqueue.h"
32 #include "gasyncqueueprivate.h"
33 #include "glib-private.h"
34 #include "gmain.h"
35 #include "gtestutils.h"
36 #include "gthreadprivate.h"
37 #include "gtimer.h"
38 #include "gutils.h"
39
40 #define DEBUG_MSG(x)
41 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */
42
43 typedef struct _GRealThreadPool GRealThreadPool;
44
45 /**
46 * GThreadPool:
47 * @func: the function to execute in the threads of this pool
48 * @user_data: the user data for the threads of this pool
49 * @exclusive: are all threads exclusive to this pool
50 *
51 * The `GThreadPool` struct represents a thread pool.
52 *
53 * A thread pool is useful when you wish to asynchronously fork out the execution of work
54 * and continue working in your own thread. If that will happen often, the overhead of starting
55 * and destroying a thread each time might be too high. In such cases reusing already started
56 * threads seems like a good idea. And it indeed is, but implementing this can be tedious
57 * and error-prone.
58 *
59 * Therefore GLib provides thread pools for your convenience. An added advantage is, that the
60 * threads can be shared between the different subsystems of your program, when they are using GLib.
61 *
62 * To create a new thread pool, you use [func@GLib.ThreadPool.new].
63 * It is destroyed by [method@GLib.ThreadPool.free].
64 *
65 * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push].
66 *
67 * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads].
68 * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed].
69 * To control the maximum number of threads for a thread pool, you use
70 * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads].
71 *
72 * Finally you can control the number of unused threads, that are kept alive by GLib for future use.
73 * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads].
74 * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and
75 * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads
76 * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads].
77 */
78 struct _GRealThreadPool
79 {
80 GThreadPool pool;
81 GAsyncQueue *queue;
82 GCond cond;
83 gint max_threads;
84 guint num_threads;
85 gboolean running;
86 gboolean immediate;
87 gboolean waiting;
88 GCompareDataFunc sort_func;
89 gpointer sort_user_data;
90 };
91
92 /* The following is just an address to mark the wakeup order for a
93 * thread, it could be any address (as long, as it isn't a valid
94 * GThreadPool address)
95 */
96 static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
97 static gint wakeup_thread_serial = 0;
98
99 /* Here all unused threads are waiting */
100 static GAsyncQueue *unused_thread_queue = NULL;
101 static gint unused_threads = 0;
102 static gint max_unused_threads = 2;
103 static gint kill_unused_threads = 0;
104 static guint max_idle_time = 15 * 1000;
105
106 typedef struct
107 {
108 /* Either thread or error are set in the end. Both transfer-full. */
109 GThreadPool *pool;
110 GThread *thread;
111 GError *error;
112 } SpawnThreadData;
113
114 static GCond spawn_thread_cond;
115 static GAsyncQueue *spawn_thread_queue;
116
117 static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
118 gpointer data);
119 static void g_thread_pool_free_internal (GRealThreadPool *pool);
120 static gpointer g_thread_pool_thread_proxy (gpointer data);
121 static gboolean g_thread_pool_start_thread (GRealThreadPool *pool,
122 GError **error);
123 static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool);
124 static GRealThreadPool* g_thread_pool_wait_for_new_pool (void);
125 static gpointer g_thread_pool_wait_for_new_task (GRealThreadPool *pool);
126
127 static void
128 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
129 gpointer data)
130 {
131 if (pool->sort_func)
132 g_async_queue_push_sorted_unlocked (pool->queue,
133 data,
134 pool->sort_func,
135 pool->sort_user_data);
136 else
137 g_async_queue_push_unlocked (pool->queue, data);
138 }
139
140 static GRealThreadPool*
141 g_thread_pool_wait_for_new_pool (void)
142 {
143 GRealThreadPool *pool;
144 gint local_wakeup_thread_serial;
145 guint local_max_unused_threads;
146 gint local_max_idle_time;
147 gint last_wakeup_thread_serial;
148 gboolean have_relayed_thread_marker = FALSE;
149
150 local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
151 local_max_idle_time = g_atomic_int_get (&max_idle_time);
152 last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
153
154 do
155 {
156 if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
157 {
158 /* If this is a superfluous thread, stop it. */
159 pool = NULL;
160 }
161 else if (local_max_idle_time > 0)
162 {
163 /* If a maximal idle time is given, wait for the given time. */
164 DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
165 g_thread_self (), local_max_idle_time / 1000.0));
166
167 pool = g_async_queue_timeout_pop (unused_thread_queue,
168 local_max_idle_time * 1000);
169 }
170 else
171 {
172 /* If no maximal idle time is given, wait indefinitely. */
173 DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
174 pool = g_async_queue_pop (unused_thread_queue);
175 }
176
177 if (pool == wakeup_thread_marker)
178 {
179 local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
180 if (last_wakeup_thread_serial == local_wakeup_thread_serial)
181 {
182 if (!have_relayed_thread_marker)
183 {
184 /* If this wakeup marker has been received for
185 * the second time, relay it.
186 */
187 DEBUG_MSG (("thread %p relaying wakeup message to "
188 "waiting thread with lower serial.",
189 g_thread_self ()));
190
191 g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
192 have_relayed_thread_marker = TRUE;
193
194 /* If a wakeup marker has been relayed, this thread
195 * will get out of the way for 100 microseconds to
196 * avoid receiving this marker again.
197 */
198 g_usleep (100);
199 }
200 }
201 else
202 {
203 if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
204 {
205 pool = NULL;
206 break;
207 }
208
209 DEBUG_MSG (("thread %p updating to new limits.",
210 g_thread_self ()));
211
212 local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
213 local_max_idle_time = g_atomic_int_get (&max_idle_time);
214 last_wakeup_thread_serial = local_wakeup_thread_serial;
215
216 have_relayed_thread_marker = FALSE;
217 }
218 }
219 }
220 while (pool == wakeup_thread_marker);
221
222 return pool;
223 }
224
225 static gpointer
226 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
227 {
228 gpointer task = NULL;
229
230 if (pool->running || (!pool->immediate &&
231 g_async_queue_length_unlocked (pool->queue) > 0))
232 {
233 /* This thread pool is still active. */
234 if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
235 {
236 /* This is a superfluous thread, so it goes to the global pool. */
237 DEBUG_MSG (("superfluous thread %p in pool %p.",
238 g_thread_self (), pool));
239 }
240 else if (pool->pool.exclusive)
241 {
242 /* Exclusive threads stay attached to the pool. */
243 task = g_async_queue_pop_unlocked (pool->queue);
244
245 DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
246 "(%d running, %d unprocessed).",
247 g_thread_self (), pool, pool->num_threads,
248 g_async_queue_length_unlocked (pool->queue)));
249 }
250 else
251 {
252 /* A thread will wait for new tasks for at most 1/2
253 * second before going to the global pool.
254 */
255 DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
256 "(%d running, %d unprocessed).",
257 g_thread_self (), pool, pool->num_threads,
258 g_async_queue_length_unlocked (pool->queue)));
259
260 task = g_async_queue_timeout_pop_unlocked (pool->queue,
261 G_USEC_PER_SEC / 2);
262 }
263 }
264 else
265 {
266 /* This thread pool is inactive, it will no longer process tasks. */
267 DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
268 "(running: %s, immediate: %s, len: %d).",
269 pool, g_thread_self (),
270 pool->running ? "true" : "false",
271 pool->immediate ? "true" : "false",
272 g_async_queue_length_unlocked (pool->queue)));
273 }
274
275 return task;
276 }
277
278 static gpointer
279 g_thread_pool_spawn_thread (gpointer data)
280 {
281 while (TRUE)
282 {
283 SpawnThreadData *spawn_thread_data;
284 GThread *thread = NULL;
285 GError *error = NULL;
286 const gchar *prgname = g_get_prgname ();
287 gchar name[16] = "pool";
288
289 if (prgname)
290 g_snprintf (name, sizeof (name), "pool-%s", prgname);
291
292 g_async_queue_lock (spawn_thread_queue);
293 /* Spawn a new thread for the given pool and wake the requesting thread
294 * up again with the result. This new thread will have the scheduler
295 * settings inherited from this thread and in extension of the thread
296 * that created the first non-exclusive thread-pool. */
297 spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
298 thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
299
300 spawn_thread_data->thread = g_steal_pointer (&thread);
301 spawn_thread_data->error = g_steal_pointer (&error);
302
303 g_cond_broadcast (&spawn_thread_cond);
304 g_async_queue_unlock (spawn_thread_queue);
305 }
306
307 return NULL;
308 }
309
310 static gpointer
311 g_thread_pool_thread_proxy (gpointer data)
312 {
313 GRealThreadPool *pool;
314
315 pool = data;
316
317 DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
318
319 g_async_queue_lock (pool->queue);
320
321 while (TRUE)
322 {
323 gpointer task;
324
325 task = g_thread_pool_wait_for_new_task (pool);
326 if (task)
327 {
328 if (pool->running || !pool->immediate)
329 {
330 /* A task was received and the thread pool is active,
331 * so execute the function.
332 */
333 g_async_queue_unlock (pool->queue);
334 DEBUG_MSG (("thread %p in pool %p calling func.",
335 g_thread_self (), pool));
336 pool->pool.func (task, pool->pool.user_data);
337 g_async_queue_lock (pool->queue);
338 }
339 }
340 else
341 {
342 /* No task was received, so this thread goes to the global pool. */
343 gboolean free_pool = FALSE;
344
345 DEBUG_MSG (("thread %p leaving pool %p for global pool.",
346 g_thread_self (), pool));
347 pool->num_threads--;
348
349 if (!pool->running)
350 {
351 if (!pool->waiting)
352 {
353 if (pool->num_threads == 0)
354 {
355 /* If the pool is not running and no other
356 * thread is waiting for this thread pool to
357 * finish and this is the last thread of this
358 * pool, free the pool.
359 */
360 free_pool = TRUE;
361 }
362 else
363 {
364 /* If the pool is not running and no other
365 * thread is waiting for this thread pool to
366 * finish and this is not the last thread of
367 * this pool and there are no tasks left in the
368 * queue, wakeup the remaining threads.
369 */
370 if (g_async_queue_length_unlocked (pool->queue) ==
371 (gint) -pool->num_threads)
372 g_thread_pool_wakeup_and_stop_all (pool);
373 }
374 }
375 else if (pool->immediate ||
376 g_async_queue_length_unlocked (pool->queue) <= 0)
377 {
378 /* If the pool is not running and another thread is
379 * waiting for this thread pool to finish and there
380 * are either no tasks left or the pool shall stop
381 * immediately, inform the waiting thread of a change
382 * of the thread pool state.
383 */
384 g_cond_broadcast (&pool->cond);
385 }
386 }
387
388 g_atomic_int_inc (&unused_threads);
389 g_async_queue_unlock (pool->queue);
390
391 if (free_pool)
392 g_thread_pool_free_internal (pool);
393
394 pool = g_thread_pool_wait_for_new_pool ();
395 g_atomic_int_add (&unused_threads, -1);
396
397 if (pool == NULL)
398 break;
399
400 g_async_queue_lock (pool->queue);
401
402 DEBUG_MSG (("thread %p entering pool %p from global pool.",
403 g_thread_self (), pool));
404
405 /* pool->num_threads++ is not done here, but in
406 * g_thread_pool_start_thread to make the new started
407 * thread known to the pool before itself can do it.
408 */
409 }
410 }
411
412 return NULL;
413 }
414
415 static gboolean
416 g_thread_pool_start_thread (GRealThreadPool *pool,
417 GError **error)
418 {
419 gboolean success = FALSE;
420
421 if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
422 /* Enough threads are already running */
423 return TRUE;
424
425 g_async_queue_lock (unused_thread_queue);
426
427 if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
428 {
429 g_async_queue_push_unlocked (unused_thread_queue, pool);
430 success = TRUE;
431 }
432
433 g_async_queue_unlock (unused_thread_queue);
434
435 if (!success)
436 {
437 const gchar *prgname = g_get_prgname ();
438 gchar name[16] = "pool";
439 GThread *thread;
440
441 if (prgname)
442 g_snprintf (name, sizeof (name), "pool-%s", prgname);
443
444 /* No thread was found, we have to start a new one */
445 if (pool->pool.exclusive)
446 {
447 /* For exclusive thread-pools this is directly called from new() and
448 * we simply start new threads that inherit the scheduler settings
449 * from the current thread.
450 */
451 thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
452 }
453 else
454 {
455 /* For non-exclusive thread-pools this can be called at any time
456 * when a new thread is needed. We make sure to create a new thread
457 * here with the correct scheduler settings by going via our helper
458 * thread.
459 */
460 SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
461
462 g_async_queue_lock (spawn_thread_queue);
463
464 g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
465
466 while (!spawn_thread_data.thread && !spawn_thread_data.error)
467 g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
468
469 thread = spawn_thread_data.thread;
470 if (!thread)
471 g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
472 g_async_queue_unlock (spawn_thread_queue);
473 }
474
475 if (thread == NULL)
476 return FALSE;
477
478 g_thread_unref (thread);
479 }
480
481 /* See comment in g_thread_pool_thread_proxy as to why this is done
482 * here and not there
483 */
484 pool->num_threads++;
485
486 return TRUE;
487 }
488
489 /**
490 * g_thread_pool_new:
491 * @func: a function to execute in the threads of the new thread pool
492 * @user_data: user data that is handed over to @func every time it
493 * is called
494 * @max_threads: the maximal number of threads to execute concurrently
495 * in the new thread pool, -1 means no limit
496 * @exclusive: should this thread pool be exclusive?
497 * @error: return location for error, or %NULL
498 *
499 * This function creates a new thread pool.
500 *
501 * Whenever you call g_thread_pool_push(), either a new thread is
502 * created or an unused one is reused. At most @max_threads threads
503 * are running concurrently for this thread pool. @max_threads = -1
504 * allows unlimited threads to be created for this thread pool. The
505 * newly created or reused thread now executes the function @func
506 * with the two arguments. The first one is the parameter to
507 * g_thread_pool_push() and the second one is @user_data.
508 *
509 * Pass g_get_num_processors() to @max_threads to create as many threads as
510 * there are logical processors on the system. This will not pin each thread to
511 * a specific processor.
512 *
513 * The parameter @exclusive determines whether the thread pool owns
514 * all threads exclusive or shares them with other thread pools.
515 * If @exclusive is %TRUE, @max_threads threads are started
516 * immediately and they will run exclusively for this thread pool
517 * until it is destroyed by g_thread_pool_free(). If @exclusive is
518 * %FALSE, threads are created when needed and shared between all
519 * non-exclusive thread pools. This implies that @max_threads may
520 * not be -1 for exclusive thread pools. Besides, exclusive thread
521 * pools are not affected by g_thread_pool_set_max_idle_time()
522 * since their threads are never considered idle and returned to the
523 * global pool.
524 *
525 * Note that the threads used by exclusive thread pools will all inherit the
526 * scheduler settings of the current thread while the threads used by
527 * non-exclusive thread pools will inherit the scheduler settings from the
528 * first thread that created such a thread pool.
529 *
530 * At least one thread will be spawned when this function is called, either to
531 * create the @max_threads exclusive threads, or to preserve the scheduler
532 * settings of the current thread for future spawns.
533 *
534 * @error can be %NULL to ignore errors, or non-%NULL to report
535 * errors. An error can only occur when @exclusive is set to %TRUE
536 * and not all @max_threads threads could be created.
537 * See #GThreadError for possible errors that may occur.
538 * Note, even in case of error a valid #GThreadPool is returned.
539 *
540 * Returns: the new #GThreadPool
541 */
542 GThreadPool *
543 g_thread_pool_new (GFunc func,
544 gpointer user_data,
545 gint max_threads,
546 gboolean exclusive,
547 GError **error)
548 {
549 return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
550 }
551
552 /**
553 * g_thread_pool_new_full:
554 * @func: a function to execute in the threads of the new thread pool
555 * @user_data: user data that is handed over to @func every time it
556 * is called
557 * @item_free_func: (nullable): used to pass as a free function to
558 * g_async_queue_new_full()
559 * @max_threads: the maximal number of threads to execute concurrently
560 * in the new thread pool, `-1` means no limit
561 * @exclusive: should this thread pool be exclusive?
562 * @error: return location for error, or %NULL
563 *
564 * This function creates a new thread pool similar to g_thread_pool_new()
565 * but allowing @item_free_func to be specified to free the data passed
566 * to g_thread_pool_push() in the case that the #GThreadPool is stopped
567 * and freed before all tasks have been executed.
568 *
569 * @item_free_func will *not* be called on items successfully passed to @func.
570 * @func is responsible for freeing the items passed to it.
571 *
572 * Returns: (transfer full): the new #GThreadPool
573 *
574 * Since: 2.70
575 */
576 GThreadPool *
577 g_thread_pool_new_full (GFunc func,
578 gpointer user_data,
579 GDestroyNotify item_free_func,
580 gint max_threads,
581 gboolean exclusive,
582 GError **error)
583 {
584 GRealThreadPool *retval;
585 G_LOCK_DEFINE_STATIC (init);
586
587 g_return_val_if_fail (func, NULL);
588 g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
589 g_return_val_if_fail (max_threads >= -1, NULL);
590
591 retval = g_new (GRealThreadPool, 1);
592
593 retval->pool.func = func;
594 retval->pool.user_data = user_data;
595 retval->pool.exclusive = exclusive;
596 retval->queue = g_async_queue_new_full (item_free_func);
597 g_cond_init (&retval->cond);
598 retval->max_threads = max_threads;
599 retval->num_threads = 0;
600 retval->running = TRUE;
601 retval->immediate = FALSE;
602 retval->waiting = FALSE;
603 retval->sort_func = NULL;
604 retval->sort_user_data = NULL;
605
606 G_LOCK (init);
607 if (!unused_thread_queue)
608 unused_thread_queue = g_async_queue_new ();
609
610 /*
611 * Spawn a helper thread that is only responsible for spawning new threads
612 * with the scheduler settings of the current thread.
613 *
614 * This is then used for making sure that all threads created on the
615 * non-exclusive thread-pool have the same scheduler settings, and more
616 * importantly don't just inherit them from the thread that just happened to
617 * push a new task and caused a new thread to be created.
618 *
619 * Not doing so could cause real-time priority threads or otherwise
620 * threads with problematic scheduler settings to be part of the
621 * non-exclusive thread-pools.
622 *
623 * For exclusive thread-pools this is not required as all threads are
624 * created immediately below and are running forever, so they will
625 * automatically inherit the scheduler settings from this very thread.
626 */
627 if (!exclusive && !spawn_thread_queue)
628 {
629 GThread *pool_spawner = NULL;
630
631 spawn_thread_queue = g_async_queue_new ();
632 g_cond_init (&spawn_thread_cond);
633 pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
634 g_ignore_leak (pool_spawner);
635 }
636 G_UNLOCK (init);
637
638 if (retval->pool.exclusive)
639 {
640 g_async_queue_lock (retval->queue);
641
642 while (retval->num_threads < (guint) retval->max_threads)
643 {
644 GError *local_error = NULL;
645
646 if (!g_thread_pool_start_thread (retval, &local_error))
647 {
648 g_propagate_error (error, local_error);
649 break;
650 }
651 }
652
653 g_async_queue_unlock (retval->queue);
654 }
655
656 return (GThreadPool*) retval;
657 }
658
659 /**
660 * g_thread_pool_push:
661 * @pool: a #GThreadPool
662 * @data: a new task for @pool
663 * @error: return location for error, or %NULL
664 *
665 * Inserts @data into the list of tasks to be executed by @pool.
666 *
667 * When the number of currently running threads is lower than the
668 * maximal allowed number of threads, a new thread is started (or
669 * reused) with the properties given to g_thread_pool_new().
670 * Otherwise, @data stays in the queue until a thread in this pool
671 * finishes its previous task and processes @data.
672 *
673 * @error can be %NULL to ignore errors, or non-%NULL to report
674 * errors. An error can only occur when a new thread couldn't be
675 * created. In that case @data is simply appended to the queue of
676 * work to do.
677 *
678 * Before version 2.32, this function did not return a success status.
679 *
680 * Returns: %TRUE on success, %FALSE if an error occurred
681 */
682 gboolean
683 g_thread_pool_push (GThreadPool *pool,
684 gpointer data,
685 GError **error)
686 {
687 GRealThreadPool *real;
688 gboolean result;
689
690 real = (GRealThreadPool*) pool;
691
692 g_return_val_if_fail (real, FALSE);
693 g_return_val_if_fail (real->running, FALSE);
694
695 result = TRUE;
696
697 g_async_queue_lock (real->queue);
698
699 if (g_async_queue_length_unlocked (real->queue) >= 0)
700 {
701 /* No thread is waiting in the queue */
702 GError *local_error = NULL;
703
704 if (!g_thread_pool_start_thread (real, &local_error))
705 {
706 g_propagate_error (error, local_error);
707 result = FALSE;
708 }
709 }
710
711 g_thread_pool_queue_push_unlocked (real, data);
712 g_async_queue_unlock (real->queue);
713
714 return result;
715 }
716
717 /**
718 * g_thread_pool_set_max_threads:
719 * @pool: a #GThreadPool
720 * @max_threads: a new maximal number of threads for @pool,
721 * or -1 for unlimited
722 * @error: return location for error, or %NULL
723 *
724 * Sets the maximal allowed number of threads for @pool.
725 * A value of -1 means that the maximal number of threads
726 * is unlimited. If @pool is an exclusive thread pool, setting
727 * the maximal number of threads to -1 is not allowed.
728 *
729 * Setting @max_threads to 0 means stopping all work for @pool.
730 * It is effectively frozen until @max_threads is set to a non-zero
731 * value again.
732 *
733 * A thread is never terminated while calling @func, as supplied by
734 * g_thread_pool_new(). Instead the maximal number of threads only
735 * has effect for the allocation of new threads in g_thread_pool_push().
736 * A new thread is allocated, whenever the number of currently
737 * running threads in @pool is smaller than the maximal number.
738 *
739 * @error can be %NULL to ignore errors, or non-%NULL to report
740 * errors. An error can only occur when a new thread couldn't be
741 * created.
742 *
743 * Before version 2.32, this function did not return a success status.
744 *
745 * Returns: %TRUE on success, %FALSE if an error occurred
746 */
747 gboolean
748 g_thread_pool_set_max_threads (GThreadPool *pool,
749 gint max_threads,
750 GError **error)
751 {
752 GRealThreadPool *real;
753 gint to_start;
754 gboolean result;
755
756 real = (GRealThreadPool*) pool;
757
758 g_return_val_if_fail (real, FALSE);
759 g_return_val_if_fail (real->running, FALSE);
760 g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
761 g_return_val_if_fail (max_threads >= -1, FALSE);
762
763 result = TRUE;
764
765 g_async_queue_lock (real->queue);
766
767 real->max_threads = max_threads;
768
769 if (pool->exclusive)
770 to_start = real->max_threads - real->num_threads;
771 else
772 to_start = g_async_queue_length_unlocked (real->queue);
773
774 for ( ; to_start > 0; to_start--)
775 {
776 GError *local_error = NULL;
777
778 if (!g_thread_pool_start_thread (real, &local_error))
779 {
780 g_propagate_error (error, local_error);
781 result = FALSE;
782 break;
783 }
784 }
785
786 g_async_queue_unlock (real->queue);
787
788 return result;
789 }
790
791 /**
792 * g_thread_pool_get_max_threads:
793 * @pool: a #GThreadPool
794 *
795 * Returns the maximal number of threads for @pool.
796 *
797 * Returns: the maximal number of threads
798 */
799 gint
800 g_thread_pool_get_max_threads (GThreadPool *pool)
801 {
802 GRealThreadPool *real;
803 gint retval;
804
805 real = (GRealThreadPool*) pool;
806
807 g_return_val_if_fail (real, 0);
808 g_return_val_if_fail (real->running, 0);
809
810 g_async_queue_lock (real->queue);
811 retval = real->max_threads;
812 g_async_queue_unlock (real->queue);
813
814 return retval;
815 }
816
817 /**
818 * g_thread_pool_get_num_threads:
819 * @pool: a #GThreadPool
820 *
821 * Returns the number of threads currently running in @pool.
822 *
823 * Returns: the number of threads currently running
824 */
825 guint
826 g_thread_pool_get_num_threads (GThreadPool *pool)
827 {
828 GRealThreadPool *real;
829 guint retval;
830
831 real = (GRealThreadPool*) pool;
832
833 g_return_val_if_fail (real, 0);
834 g_return_val_if_fail (real->running, 0);
835
836 g_async_queue_lock (real->queue);
837 retval = real->num_threads;
838 g_async_queue_unlock (real->queue);
839
840 return retval;
841 }
842
843 /**
844 * g_thread_pool_unprocessed:
845 * @pool: a #GThreadPool
846 *
847 * Returns the number of tasks still unprocessed in @pool.
848 *
849 * Returns: the number of unprocessed tasks
850 */
851 guint
852 g_thread_pool_unprocessed (GThreadPool *pool)
853 {
854 GRealThreadPool *real;
855 gint unprocessed;
856
857 real = (GRealThreadPool*) pool;
858
859 g_return_val_if_fail (real, 0);
860 g_return_val_if_fail (real->running, 0);
861
862 unprocessed = g_async_queue_length (real->queue);
863
864 return MAX (unprocessed, 0);
865 }
866
867 /**
868 * g_thread_pool_free:
869 * @pool: a #GThreadPool
870 * @immediate: should @pool shut down immediately?
871 * @wait_: should the function wait for all tasks to be finished?
872 *
873 * Frees all resources allocated for @pool.
874 *
875 * If @immediate is %TRUE, no new task is processed for @pool.
876 * Otherwise @pool is not freed before the last task is processed.
877 * Note however, that no thread of this pool is interrupted while
878 * processing a task. Instead at least all still running threads
879 * can finish their tasks before the @pool is freed.
880 *
881 * If @wait_ is %TRUE, this function does not return before all
882 * tasks to be processed (dependent on @immediate, whether all
883 * or only the currently running) are ready.
884 * Otherwise this function returns immediately.
885 *
886 * After calling this function @pool must not be used anymore.
887 */
888 void
889 g_thread_pool_free (GThreadPool *pool,
890 gboolean immediate,
891 gboolean wait_)
892 {
893 GRealThreadPool *real;
894
895 real = (GRealThreadPool*) pool;
896
897 g_return_if_fail (real);
898 g_return_if_fail (real->running);
899
900 /* If there's no thread allowed here, there is not much sense in
901 * not stopping this pool immediately, when it's not empty
902 */
903 g_return_if_fail (immediate ||
904 real->max_threads != 0 ||
905 g_async_queue_length (real->queue) == 0);
906
907 g_async_queue_lock (real->queue);
908
909 real->running = FALSE;
910 real->immediate = immediate;
911 real->waiting = wait_;
912
913 if (wait_)
914 {
915 while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
916 !(immediate && real->num_threads == 0))
917 g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
918 }
919
920 if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
921 {
922 /* No thread is currently doing something (and nothing is left
923 * to process in the queue)
924 */
925 if (real->num_threads == 0)
926 {
927 /* No threads left, we clean up */
928 g_async_queue_unlock (real->queue);
929 g_thread_pool_free_internal (real);
930 return;
931 }
932
933 g_thread_pool_wakeup_and_stop_all (real);
934 }
935
936 /* The last thread should cleanup the pool */
937 real->waiting = FALSE;
938 g_async_queue_unlock (real->queue);
939 }
940
941 static void
942 g_thread_pool_free_internal (GRealThreadPool* pool)
943 {
944 g_return_if_fail (pool);
945 g_return_if_fail (pool->running == FALSE);
946 g_return_if_fail (pool->num_threads == 0);
947
948 /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
949 * removed, before it’s potentially passed to the user-provided
950 * @item_free_func. */
951 g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
952
953 g_async_queue_unref (pool->queue);
954 g_cond_clear (&pool->cond);
955
956 g_free (pool);
957 }
958
959 static void
960 g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
961 {
962 guint i;
963
964 g_return_if_fail (pool);
965 g_return_if_fail (pool->running == FALSE);
966 g_return_if_fail (pool->num_threads != 0);
967
968 pool->immediate = TRUE;
969
970 /*
971 * So here we're sending bogus data to the pool threads, which
972 * should cause them each to wake up, and check the above
973 * pool->immediate condition. However we don't want that
974 * data to be sorted (since it'll crash the sorter).
975 */
976 for (i = 0; i < pool->num_threads; i++)
977 g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
978 }
979
980 /**
981 * g_thread_pool_set_max_unused_threads:
982 * @max_threads: maximal number of unused threads
983 *
984 * Sets the maximal number of unused threads to @max_threads.
985 * If @max_threads is -1, no limit is imposed on the number
986 * of unused threads.
987 *
988 * The default value is 2.
989 */
990 void
991 g_thread_pool_set_max_unused_threads (gint max_threads)
992 {
993 g_return_if_fail (max_threads >= -1);
994
995 g_atomic_int_set (&max_unused_threads, max_threads);
996
997 if (max_threads != -1)
998 {
999 max_threads -= g_atomic_int_get (&unused_threads);
1000 if (max_threads < 0)
1001 {
1002 g_atomic_int_set (&kill_unused_threads, -max_threads);
1003 g_atomic_int_inc (&wakeup_thread_serial);
1004
1005 g_async_queue_lock (unused_thread_queue);
1006
1007 do
1008 {
1009 g_async_queue_push_unlocked (unused_thread_queue,
1010 wakeup_thread_marker);
1011 }
1012 while (++max_threads);
1013
1014 g_async_queue_unlock (unused_thread_queue);
1015 }
1016 }
1017 }
1018
1019 /**
1020 * g_thread_pool_get_max_unused_threads:
1021 *
1022 * Returns the maximal allowed number of unused threads.
1023 *
1024 * Returns: the maximal number of unused threads
1025 */
1026 gint
1027 g_thread_pool_get_max_unused_threads (void)
1028 {
1029 return g_atomic_int_get (&max_unused_threads);
1030 }
1031
1032 /**
1033 * g_thread_pool_get_num_unused_threads:
1034 *
1035 * Returns the number of currently unused threads.
1036 *
1037 * Returns: the number of currently unused threads
1038 */
1039 guint
1040 g_thread_pool_get_num_unused_threads (void)
1041 {
1042 return (guint) g_atomic_int_get (&unused_threads);
1043 }
1044
1045 /**
1046 * g_thread_pool_stop_unused_threads:
1047 *
1048 * Stops all currently unused threads. This does not change the
1049 * maximal number of unused threads. This function can be used to
1050 * regularly stop all unused threads e.g. from g_timeout_add().
1051 */
1052 void
1053 g_thread_pool_stop_unused_threads (void)
1054 {
1055 guint oldval;
1056
1057 oldval = g_thread_pool_get_max_unused_threads ();
1058
1059 g_thread_pool_set_max_unused_threads (0);
1060 g_thread_pool_set_max_unused_threads (oldval);
1061 }
1062
1063 /**
1064 * g_thread_pool_set_sort_function:
1065 * @pool: a #GThreadPool
1066 * @func: the #GCompareDataFunc used to sort the list of tasks.
1067 * This function is passed two tasks. It should return
1068 * 0 if the order in which they are handled does not matter,
1069 * a negative value if the first task should be processed before
1070 * the second or a positive value if the second task should be
1071 * processed first.
1072 * @user_data: user data passed to @func
1073 *
1074 * Sets the function used to sort the list of tasks. This allows the
1075 * tasks to be processed by a priority determined by @func, and not
1076 * just in the order in which they were added to the pool.
1077 *
1078 * Note, if the maximum number of threads is more than 1, the order
1079 * that threads are executed cannot be guaranteed 100%. Threads are
1080 * scheduled by the operating system and are executed at random. It
1081 * cannot be assumed that threads are executed in the order they are
1082 * created.
1083 *
1084 * Since: 2.10
1085 */
1086 void
1087 g_thread_pool_set_sort_function (GThreadPool *pool,
1088 GCompareDataFunc func,
1089 gpointer user_data)
1090 {
1091 GRealThreadPool *real;
1092
1093 real = (GRealThreadPool*) pool;
1094
1095 g_return_if_fail (real);
1096 g_return_if_fail (real->running);
1097
1098 g_async_queue_lock (real->queue);
1099
1100 real->sort_func = func;
1101 real->sort_user_data = user_data;
1102
1103 if (func)
1104 g_async_queue_sort_unlocked (real->queue,
1105 real->sort_func,
1106 real->sort_user_data);
1107
1108 g_async_queue_unlock (real->queue);
1109 }
1110
1111 /**
1112 * g_thread_pool_move_to_front:
1113 * @pool: a #GThreadPool
1114 * @data: an unprocessed item in the pool
1115 *
1116 * Moves the item to the front of the queue of unprocessed
1117 * items, so that it will be processed next.
1118 *
1119 * Returns: %TRUE if the item was found and moved
1120 *
1121 * Since: 2.46
1122 */
1123 gboolean
1124 g_thread_pool_move_to_front (GThreadPool *pool,
1125 gpointer data)
1126 {
1127 GRealThreadPool *real = (GRealThreadPool*) pool;
1128 gboolean found;
1129
1130 g_async_queue_lock (real->queue);
1131
1132 found = g_async_queue_remove_unlocked (real->queue, data);
1133 if (found)
1134 g_async_queue_push_front_unlocked (real->queue, data);
1135
1136 g_async_queue_unlock (real->queue);
1137
1138 return found;
1139 }
1140
1141 /**
1142 * g_thread_pool_set_max_idle_time:
1143 * @interval: the maximum @interval (in milliseconds)
1144 * a thread can be idle
1145 *
1146 * This function will set the maximum @interval that a thread
1147 * waiting in the pool for new tasks can be idle for before
1148 * being stopped. This function is similar to calling
1149 * g_thread_pool_stop_unused_threads() on a regular timeout,
1150 * except this is done on a per thread basis.
1151 *
1152 * By setting @interval to 0, idle threads will not be stopped.
1153 *
1154 * The default value is 15000 (15 seconds).
1155 *
1156 * Since: 2.10
1157 */
1158 void
1159 g_thread_pool_set_max_idle_time (guint interval)
1160 {
1161 guint i;
1162
1163 g_atomic_int_set (&max_idle_time, interval);
1164
1165 i = (guint) g_atomic_int_get (&unused_threads);
1166 if (i > 0)
1167 {
1168 g_atomic_int_inc (&wakeup_thread_serial);
1169 g_async_queue_lock (unused_thread_queue);
1170
1171 do
1172 {
1173 g_async_queue_push_unlocked (unused_thread_queue,
1174 wakeup_thread_marker);
1175 }
1176 while (--i);
1177
1178 g_async_queue_unlock (unused_thread_queue);
1179 }
1180 }
1181
1182 /**
1183 * g_thread_pool_get_max_idle_time:
1184 *
1185 * This function will return the maximum @interval that a
1186 * thread will wait in the thread pool for new tasks before
1187 * being stopped.
1188 *
1189 * If this function returns 0, threads waiting in the thread
1190 * pool for new work are not stopped.
1191 *
1192 * Returns: the maximum @interval (milliseconds) to wait
1193 * for new tasks in the thread pool before stopping the
1194 * thread
1195 *
1196 * Since: 2.10
1197 */
1198 guint
1199 g_thread_pool_get_max_idle_time (void)
1200 {
1201 return (guint) g_atomic_int_get (&max_idle_time);
1202 }