(root)/
glib-2.79.0/
glib/
gthreadpool.c
       1  /* GLIB - Library of useful routines for C programming
       2   * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
       3   *
       4   * GThreadPool: thread pool implementation.
       5   * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
       6   *
       7   * SPDX-License-Identifier: LGPL-2.1-or-later
       8   *
       9   * This library is free software; you can redistribute it and/or
      10   * modify it under the terms of the GNU Lesser General Public
      11   * License as published by the Free Software Foundation; either
      12   * version 2.1 of the License, or (at your option) any later version.
      13   *
      14   * This library is distributed in the hope that it will be useful,
      15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
      16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      17   * Lesser General Public License for more details.
      18   *
      19   * You should have received a copy of the GNU Lesser General Public
      20   * License along with this library; if not, see <http://www.gnu.org/licenses/>.
      21   */
      22  
      23  /*
      24   * MT safe
      25   */
      26  
      27  #include "config.h"
      28  
      29  #include "gthreadpool.h"
      30  
      31  #include "gasyncqueue.h"
      32  #include "gasyncqueueprivate.h"
      33  #include "glib-private.h"
      34  #include "gmain.h"
      35  #include "gtestutils.h"
      36  #include "gthreadprivate.h"
      37  #include "gtimer.h"
      38  #include "gutils.h"
      39  
      40  #define DEBUG_MSG(x)
      41  /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
      42  
      43  typedef struct _GRealThreadPool GRealThreadPool;
      44  
      45  /**
      46   * GThreadPool:
      47   * @func: the function to execute in the threads of this pool
      48   * @user_data: the user data for the threads of this pool
      49   * @exclusive: are all threads exclusive to this pool
      50   *
      51   * The `GThreadPool` struct represents a thread pool.
      52   *
      53   * A thread pool is useful when you wish to asynchronously fork out the execution of work
      54   * and continue working in your own thread. If that will happen often, the overhead of starting
      55   * and destroying a thread each time might be too high. In such cases reusing already started
      56   * threads seems like a good idea. And it indeed is, but implementing this can be tedious
      57   * and error-prone.
      58   *
      59   * Therefore GLib provides thread pools for your convenience. An added advantage is, that the
      60   * threads can be shared between the different subsystems of your program, when they are using GLib.
      61   *
      62   * To create a new thread pool, you use [func@GLib.ThreadPool.new].
      63   * It is destroyed by [method@GLib.ThreadPool.free].
      64   *
      65   * If you want to execute a certain task within a thread pool, use [method@GLib.ThreadPool.push].
      66   *
      67   * To get the current number of running threads you call [method@GLib.ThreadPool.get_num_threads].
      68   * To get the number of still unprocessed tasks you call [method@GLib.ThreadPool.unprocessed].
      69   * To control the maximum number of threads for a thread pool, you use
      70   * [method@GLib.ThreadPool.get_max_threads]. and [method@GLib.ThreadPool.set_max_threads].
      71   *
      72   * Finally you can control the number of unused threads, that are kept alive by GLib for future use.
      73   * The current number can be fetched with [func@GLib.ThreadPool.get_num_unused_threads].
      74   * The maximum number can be controlled by [func@GLib.ThreadPool.get_max_unused_threads] and
      75   * [func@GLib.ThreadPool.set_max_unused_threads]. All currently unused threads
      76   * can be stopped by calling [func@GLib.ThreadPool.stop_unused_threads].
      77   */
      78  struct _GRealThreadPool
      79  {
      80    GThreadPool pool;
      81    GAsyncQueue *queue;
      82    GCond cond;
      83    gint max_threads;
      84    guint num_threads;
      85    gboolean running;
      86    gboolean immediate;
      87    gboolean waiting;
      88    GCompareDataFunc sort_func;
      89    gpointer sort_user_data;
      90  };
      91  
      92  /* The following is just an address to mark the wakeup order for a
      93   * thread, it could be any address (as long, as it isn't a valid
      94   * GThreadPool address)
      95   */
      96  static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new;
      97  static gint wakeup_thread_serial = 0;
      98  
      99  /* Here all unused threads are waiting  */
     100  static GAsyncQueue *unused_thread_queue = NULL;
     101  static gint unused_threads = 0;
     102  static gint max_unused_threads = 2;
     103  static gint kill_unused_threads = 0;
     104  static guint max_idle_time = 15 * 1000;
     105  
     106  typedef struct
     107  {
     108    /* Either thread or error are set in the end. Both transfer-full. */
     109    GThreadPool *pool;
     110    GThread *thread;
     111    GError *error;
     112  } SpawnThreadData;
     113  
     114  static GCond spawn_thread_cond;
     115  static GAsyncQueue *spawn_thread_queue;
     116  
     117  static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
     118                                                             gpointer          data);
     119  static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
     120  static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
     121  static gboolean         g_thread_pool_start_thread        (GRealThreadPool  *pool,
     122                                                             GError          **error);
     123  static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
     124  static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
     125  static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
     126  
     127  static void
     128  g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
     129                                     gpointer         data)
     130  {
     131    if (pool->sort_func)
     132      g_async_queue_push_sorted_unlocked (pool->queue,
     133                                          data,
     134                                          pool->sort_func,
     135                                          pool->sort_user_data);
     136    else
     137      g_async_queue_push_unlocked (pool->queue, data);
     138  }
     139  
     140  static GRealThreadPool*
     141  g_thread_pool_wait_for_new_pool (void)
     142  {
     143    GRealThreadPool *pool;
     144    gint local_wakeup_thread_serial;
     145    guint local_max_unused_threads;
     146    gint local_max_idle_time;
     147    gint last_wakeup_thread_serial;
     148    gboolean have_relayed_thread_marker = FALSE;
     149  
     150    local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
     151    local_max_idle_time = g_atomic_int_get (&max_idle_time);
     152    last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
     153  
     154    do
     155      {
     156        if ((guint) g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
     157          {
     158            /* If this is a superfluous thread, stop it. */
     159            pool = NULL;
     160          }
     161        else if (local_max_idle_time > 0)
     162          {
     163            /* If a maximal idle time is given, wait for the given time. */
     164            DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
     165                        g_thread_self (), local_max_idle_time / 1000.0));
     166  
     167            pool = g_async_queue_timeout_pop (unused_thread_queue,
     168  					    local_max_idle_time * 1000);
     169          }
     170        else
     171          {
     172            /* If no maximal idle time is given, wait indefinitely. */
     173            DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ()));
     174            pool = g_async_queue_pop (unused_thread_queue);
     175          }
     176  
     177        if (pool == wakeup_thread_marker)
     178          {
     179            local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
     180            if (last_wakeup_thread_serial == local_wakeup_thread_serial)
     181              {
     182                if (!have_relayed_thread_marker)
     183                {
     184                  /* If this wakeup marker has been received for
     185                   * the second time, relay it.
     186                   */
     187                  DEBUG_MSG (("thread %p relaying wakeup message to "
     188                              "waiting thread with lower serial.",
     189                              g_thread_self ()));
     190  
     191                  g_async_queue_push (unused_thread_queue, wakeup_thread_marker);
     192                  have_relayed_thread_marker = TRUE;
     193  
     194                  /* If a wakeup marker has been relayed, this thread
     195                   * will get out of the way for 100 microseconds to
     196                   * avoid receiving this marker again.
     197                   */
     198                  g_usleep (100);
     199                }
     200              }
     201            else
     202              {
     203                if (g_atomic_int_add (&kill_unused_threads, -1) > 0)
     204                  {
     205                    pool = NULL;
     206                    break;
     207                  }
     208  
     209                DEBUG_MSG (("thread %p updating to new limits.",
     210                            g_thread_self ()));
     211  
     212                local_max_unused_threads = (guint) g_atomic_int_get (&max_unused_threads);
     213                local_max_idle_time = g_atomic_int_get (&max_idle_time);
     214                last_wakeup_thread_serial = local_wakeup_thread_serial;
     215  
     216                have_relayed_thread_marker = FALSE;
     217              }
     218          }
     219      }
     220    while (pool == wakeup_thread_marker);
     221  
     222    return pool;
     223  }
     224  
     225  static gpointer
     226  g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
     227  {
     228    gpointer task = NULL;
     229  
     230    if (pool->running || (!pool->immediate &&
     231                          g_async_queue_length_unlocked (pool->queue) > 0))
     232      {
     233        /* This thread pool is still active. */
     234        if (pool->max_threads != -1 && pool->num_threads > (guint) pool->max_threads)
     235          {
     236            /* This is a superfluous thread, so it goes to the global pool. */
     237            DEBUG_MSG (("superfluous thread %p in pool %p.",
     238                        g_thread_self (), pool));
     239          }
     240        else if (pool->pool.exclusive)
     241          {
     242            /* Exclusive threads stay attached to the pool. */
     243            task = g_async_queue_pop_unlocked (pool->queue);
     244  
     245            DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
     246                        "(%d running, %d unprocessed).",
     247                        g_thread_self (), pool, pool->num_threads,
     248                        g_async_queue_length_unlocked (pool->queue)));
     249          }
     250        else
     251          {
     252            /* A thread will wait for new tasks for at most 1/2
     253             * second before going to the global pool.
     254             */
     255            DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
     256                        "(%d running, %d unprocessed).",
     257                        g_thread_self (), pool, pool->num_threads,
     258                        g_async_queue_length_unlocked (pool->queue)));
     259  
     260            task = g_async_queue_timeout_pop_unlocked (pool->queue,
     261  						     G_USEC_PER_SEC / 2);
     262          }
     263      }
     264    else
     265      {
     266        /* This thread pool is inactive, it will no longer process tasks. */
     267        DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
     268                    "(running: %s, immediate: %s, len: %d).",
     269                    pool, g_thread_self (),
     270                    pool->running ? "true" : "false",
     271                    pool->immediate ? "true" : "false",
     272                    g_async_queue_length_unlocked (pool->queue)));
     273      }
     274  
     275    return task;
     276  }
     277  
     278  static gpointer
     279  g_thread_pool_spawn_thread (gpointer data)
     280  {
     281    while (TRUE)
     282      {
     283        SpawnThreadData *spawn_thread_data;
     284        GThread *thread = NULL;
     285        GError *error = NULL;
     286        const gchar *prgname = g_get_prgname ();
     287        gchar name[16] = "pool";
     288  
     289        if (prgname)
     290          g_snprintf (name, sizeof (name), "pool-%s", prgname);
     291  
     292        g_async_queue_lock (spawn_thread_queue);
     293        /* Spawn a new thread for the given pool and wake the requesting thread
     294         * up again with the result. This new thread will have the scheduler
     295         * settings inherited from this thread and in extension of the thread
     296         * that created the first non-exclusive thread-pool. */
     297        spawn_thread_data = g_async_queue_pop_unlocked (spawn_thread_queue);
     298        thread = g_thread_try_new (name, g_thread_pool_thread_proxy, spawn_thread_data->pool, &error);
     299  
     300        spawn_thread_data->thread = g_steal_pointer (&thread);
     301        spawn_thread_data->error = g_steal_pointer (&error);
     302  
     303        g_cond_broadcast (&spawn_thread_cond);
     304        g_async_queue_unlock (spawn_thread_queue);
     305      }
     306  
     307    return NULL;
     308  }
     309  
     310  static gpointer
     311  g_thread_pool_thread_proxy (gpointer data)
     312  {
     313    GRealThreadPool *pool;
     314  
     315    pool = data;
     316  
     317    DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool));
     318  
     319    g_async_queue_lock (pool->queue);
     320  
     321    while (TRUE)
     322      {
     323        gpointer task;
     324  
     325        task = g_thread_pool_wait_for_new_task (pool);
     326        if (task)
     327          {
     328            if (pool->running || !pool->immediate)
     329              {
     330                /* A task was received and the thread pool is active,
     331                 * so execute the function.
     332                 */
     333                g_async_queue_unlock (pool->queue);
     334                DEBUG_MSG (("thread %p in pool %p calling func.",
     335                            g_thread_self (), pool));
     336                pool->pool.func (task, pool->pool.user_data);
     337                g_async_queue_lock (pool->queue);
     338              }
     339          }
     340        else
     341          {
     342            /* No task was received, so this thread goes to the global pool. */
     343            gboolean free_pool = FALSE;
     344  
     345            DEBUG_MSG (("thread %p leaving pool %p for global pool.",
     346                        g_thread_self (), pool));
     347            pool->num_threads--;
     348  
     349            if (!pool->running)
     350              {
     351                if (!pool->waiting)
     352                  {
     353                    if (pool->num_threads == 0)
     354                      {
     355                        /* If the pool is not running and no other
     356                         * thread is waiting for this thread pool to
     357                         * finish and this is the last thread of this
     358                         * pool, free the pool.
     359                         */
     360                        free_pool = TRUE;
     361                      }
     362                    else
     363                      {
     364                        /* If the pool is not running and no other
     365                         * thread is waiting for this thread pool to
     366                         * finish and this is not the last thread of
     367                         * this pool and there are no tasks left in the
     368                         * queue, wakeup the remaining threads.
     369                         */
     370                        if (g_async_queue_length_unlocked (pool->queue) ==
     371                            (gint) -pool->num_threads)
     372                          g_thread_pool_wakeup_and_stop_all (pool);
     373                      }
     374                  }
     375                else if (pool->immediate ||
     376                         g_async_queue_length_unlocked (pool->queue) <= 0)
     377                  {
     378                    /* If the pool is not running and another thread is
     379                     * waiting for this thread pool to finish and there
     380                     * are either no tasks left or the pool shall stop
     381                     * immediately, inform the waiting thread of a change
     382                     * of the thread pool state.
     383                     */
     384                    g_cond_broadcast (&pool->cond);
     385                  }
     386              }
     387  
     388            g_atomic_int_inc (&unused_threads);
     389            g_async_queue_unlock (pool->queue);
     390  
     391            if (free_pool)
     392              g_thread_pool_free_internal (pool);
     393  
     394            pool = g_thread_pool_wait_for_new_pool ();
     395            g_atomic_int_add (&unused_threads, -1);
     396  
     397            if (pool == NULL)
     398              break;
     399  
     400            g_async_queue_lock (pool->queue);
     401  
     402            DEBUG_MSG (("thread %p entering pool %p from global pool.",
     403                        g_thread_self (), pool));
     404  
     405            /* pool->num_threads++ is not done here, but in
     406             * g_thread_pool_start_thread to make the new started
     407             * thread known to the pool before itself can do it.
     408             */
     409          }
     410      }
     411  
     412    return NULL;
     413  }
     414  
     415  static gboolean
     416  g_thread_pool_start_thread (GRealThreadPool  *pool,
     417                              GError          **error)
     418  {
     419    gboolean success = FALSE;
     420  
     421    if (pool->max_threads != -1 && pool->num_threads >= (guint) pool->max_threads)
     422      /* Enough threads are already running */
     423      return TRUE;
     424  
     425    g_async_queue_lock (unused_thread_queue);
     426  
     427    if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
     428      {
     429        g_async_queue_push_unlocked (unused_thread_queue, pool);
     430        success = TRUE;
     431      }
     432  
     433    g_async_queue_unlock (unused_thread_queue);
     434  
     435    if (!success)
     436      {
     437        const gchar *prgname = g_get_prgname ();
     438        gchar name[16] = "pool";
     439        GThread *thread;
     440  
     441        if (prgname)
     442          g_snprintf (name, sizeof (name), "pool-%s", prgname);
     443  
     444        /* No thread was found, we have to start a new one */
     445        if (pool->pool.exclusive)
     446          {
     447            /* For exclusive thread-pools this is directly called from new() and
     448             * we simply start new threads that inherit the scheduler settings
     449             * from the current thread.
     450             */
     451            thread = g_thread_try_new (name, g_thread_pool_thread_proxy, pool, error);
     452          }
     453        else
     454          {
     455            /* For non-exclusive thread-pools this can be called at any time
     456             * when a new thread is needed. We make sure to create a new thread
     457             * here with the correct scheduler settings by going via our helper
     458             * thread.
     459             */
     460            SpawnThreadData spawn_thread_data = { (GThreadPool *) pool, NULL, NULL };
     461  
     462            g_async_queue_lock (spawn_thread_queue);
     463  
     464            g_async_queue_push_unlocked (spawn_thread_queue, &spawn_thread_data);
     465  
     466            while (!spawn_thread_data.thread && !spawn_thread_data.error)
     467              g_cond_wait (&spawn_thread_cond, _g_async_queue_get_mutex (spawn_thread_queue));
     468  
     469            thread = spawn_thread_data.thread;
     470            if (!thread)
     471              g_propagate_error (error, g_steal_pointer (&spawn_thread_data.error));
     472            g_async_queue_unlock (spawn_thread_queue);
     473          }
     474  
     475        if (thread == NULL)
     476          return FALSE;
     477  
     478        g_thread_unref (thread);
     479      }
     480  
     481    /* See comment in g_thread_pool_thread_proxy as to why this is done
     482     * here and not there
     483     */
     484    pool->num_threads++;
     485  
     486    return TRUE;
     487  }
     488  
     489  /**
     490   * g_thread_pool_new:
     491   * @func: a function to execute in the threads of the new thread pool
     492   * @user_data: user data that is handed over to @func every time it
     493   *     is called
     494   * @max_threads: the maximal number of threads to execute concurrently
     495   *     in  the new thread pool, -1 means no limit
     496   * @exclusive: should this thread pool be exclusive?
     497   * @error: return location for error, or %NULL
     498   *
     499   * This function creates a new thread pool.
     500   *
     501   * Whenever you call g_thread_pool_push(), either a new thread is
     502   * created or an unused one is reused. At most @max_threads threads
     503   * are running concurrently for this thread pool. @max_threads = -1
     504   * allows unlimited threads to be created for this thread pool. The
     505   * newly created or reused thread now executes the function @func
     506   * with the two arguments. The first one is the parameter to
     507   * g_thread_pool_push() and the second one is @user_data.
     508   *
     509   * Pass g_get_num_processors() to @max_threads to create as many threads as
     510   * there are logical processors on the system. This will not pin each thread to
     511   * a specific processor.
     512   *
     513   * The parameter @exclusive determines whether the thread pool owns
     514   * all threads exclusive or shares them with other thread pools.
     515   * If @exclusive is %TRUE, @max_threads threads are started
     516   * immediately and they will run exclusively for this thread pool
     517   * until it is destroyed by g_thread_pool_free(). If @exclusive is
     518   * %FALSE, threads are created when needed and shared between all
     519   * non-exclusive thread pools. This implies that @max_threads may
     520   * not be -1 for exclusive thread pools. Besides, exclusive thread
     521   * pools are not affected by g_thread_pool_set_max_idle_time()
     522   * since their threads are never considered idle and returned to the
     523   * global pool.
     524   *
     525   * Note that the threads used by exclusive thread pools will all inherit the
     526   * scheduler settings of the current thread while the threads used by
     527   * non-exclusive thread pools will inherit the scheduler settings from the
     528   * first thread that created such a thread pool.
     529   *
     530   * At least one thread will be spawned when this function is called, either to
     531   * create the @max_threads exclusive threads, or to preserve the scheduler
     532   * settings of the current thread for future spawns.
     533   *
     534   * @error can be %NULL to ignore errors, or non-%NULL to report
     535   * errors. An error can only occur when @exclusive is set to %TRUE
     536   * and not all @max_threads threads could be created.
     537   * See #GThreadError for possible errors that may occur.
     538   * Note, even in case of error a valid #GThreadPool is returned.
     539   *
     540   * Returns: the new #GThreadPool
     541   */
     542  GThreadPool *
     543  g_thread_pool_new (GFunc      func,
     544                     gpointer   user_data,
     545                     gint       max_threads,
     546                     gboolean   exclusive,
     547                     GError   **error)
     548  {
     549    return g_thread_pool_new_full (func, user_data, NULL, max_threads, exclusive, error);
     550  }
     551  
     552  /**
     553   * g_thread_pool_new_full:
     554   * @func: a function to execute in the threads of the new thread pool
     555   * @user_data: user data that is handed over to @func every time it
     556   *     is called
     557   * @item_free_func: (nullable): used to pass as a free function to
     558   *     g_async_queue_new_full()
     559   * @max_threads: the maximal number of threads to execute concurrently
     560   *     in the new thread pool, `-1` means no limit
     561   * @exclusive: should this thread pool be exclusive?
     562   * @error: return location for error, or %NULL
     563   *
     564   * This function creates a new thread pool similar to g_thread_pool_new()
     565   * but allowing @item_free_func to be specified to free the data passed
     566   * to g_thread_pool_push() in the case that the #GThreadPool is stopped
     567   * and freed before all tasks have been executed.
     568   *
     569   * @item_free_func will *not* be called on items successfully passed to @func.
     570   * @func is responsible for freeing the items passed to it.
     571   *
     572   * Returns: (transfer full): the new #GThreadPool
     573   *
     574   * Since: 2.70
     575   */
     576  GThreadPool *
     577  g_thread_pool_new_full (GFunc           func,
     578                          gpointer        user_data,
     579                          GDestroyNotify  item_free_func,
     580                          gint            max_threads,
     581                          gboolean        exclusive,
     582                          GError        **error)
     583  {
     584    GRealThreadPool *retval;
     585    G_LOCK_DEFINE_STATIC (init);
     586  
     587    g_return_val_if_fail (func, NULL);
     588    g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
     589    g_return_val_if_fail (max_threads >= -1, NULL);
     590  
     591    retval = g_new (GRealThreadPool, 1);
     592  
     593    retval->pool.func = func;
     594    retval->pool.user_data = user_data;
     595    retval->pool.exclusive = exclusive;
     596    retval->queue = g_async_queue_new_full (item_free_func);
     597    g_cond_init (&retval->cond);
     598    retval->max_threads = max_threads;
     599    retval->num_threads = 0;
     600    retval->running = TRUE;
     601    retval->immediate = FALSE;
     602    retval->waiting = FALSE;
     603    retval->sort_func = NULL;
     604    retval->sort_user_data = NULL;
     605  
     606    G_LOCK (init);
     607    if (!unused_thread_queue)
     608        unused_thread_queue = g_async_queue_new ();
     609  
     610    /*
     611     * Spawn a helper thread that is only responsible for spawning new threads
     612     * with the scheduler settings of the current thread.
     613     *
     614     * This is then used for making sure that all threads created on the
     615     * non-exclusive thread-pool have the same scheduler settings, and more
     616     * importantly don't just inherit them from the thread that just happened to
     617     * push a new task and caused a new thread to be created.
     618     *
     619     * Not doing so could cause real-time priority threads or otherwise
     620     * threads with problematic scheduler settings to be part of the
     621     * non-exclusive thread-pools.
     622     *
     623     * For exclusive thread-pools this is not required as all threads are
     624     * created immediately below and are running forever, so they will
     625     * automatically inherit the scheduler settings from this very thread.
     626     */
     627    if (!exclusive && !spawn_thread_queue)
     628      {
     629        GThread *pool_spawner = NULL;
     630  
     631        spawn_thread_queue = g_async_queue_new ();
     632        g_cond_init (&spawn_thread_cond);
     633        pool_spawner = g_thread_new ("pool-spawner", g_thread_pool_spawn_thread, NULL);
     634        g_ignore_leak (pool_spawner);
     635      }
     636    G_UNLOCK (init);
     637  
     638    if (retval->pool.exclusive)
     639      {
     640        g_async_queue_lock (retval->queue);
     641  
     642        while (retval->num_threads < (guint) retval->max_threads)
     643          {
     644            GError *local_error = NULL;
     645  
     646            if (!g_thread_pool_start_thread (retval, &local_error))
     647              {
     648                g_propagate_error (error, local_error);
     649                break;
     650              }
     651          }
     652  
     653        g_async_queue_unlock (retval->queue);
     654      }
     655  
     656    return (GThreadPool*) retval;
     657  }
     658  
     659  /**
     660   * g_thread_pool_push:
     661   * @pool: a #GThreadPool
     662   * @data: a new task for @pool
     663   * @error: return location for error, or %NULL
     664   *
     665   * Inserts @data into the list of tasks to be executed by @pool.
     666   *
     667   * When the number of currently running threads is lower than the
     668   * maximal allowed number of threads, a new thread is started (or
     669   * reused) with the properties given to g_thread_pool_new().
     670   * Otherwise, @data stays in the queue until a thread in this pool
     671   * finishes its previous task and processes @data.
     672   *
     673   * @error can be %NULL to ignore errors, or non-%NULL to report
     674   * errors. An error can only occur when a new thread couldn't be
     675   * created. In that case @data is simply appended to the queue of
     676   * work to do.
     677   *
     678   * Before version 2.32, this function did not return a success status.
     679   *
     680   * Returns: %TRUE on success, %FALSE if an error occurred
     681   */
     682  gboolean
     683  g_thread_pool_push (GThreadPool  *pool,
     684                      gpointer      data,
     685                      GError      **error)
     686  {
     687    GRealThreadPool *real;
     688    gboolean result;
     689  
     690    real = (GRealThreadPool*) pool;
     691  
     692    g_return_val_if_fail (real, FALSE);
     693    g_return_val_if_fail (real->running, FALSE);
     694  
     695    result = TRUE;
     696  
     697    g_async_queue_lock (real->queue);
     698  
     699    if (g_async_queue_length_unlocked (real->queue) >= 0)
     700      {
     701        /* No thread is waiting in the queue */
     702        GError *local_error = NULL;
     703  
     704        if (!g_thread_pool_start_thread (real, &local_error))
     705          {
     706            g_propagate_error (error, local_error);
     707            result = FALSE;
     708          }
     709      }
     710  
     711    g_thread_pool_queue_push_unlocked (real, data);
     712    g_async_queue_unlock (real->queue);
     713  
     714    return result;
     715  }
     716  
     717  /**
     718   * g_thread_pool_set_max_threads:
     719   * @pool: a #GThreadPool
     720   * @max_threads: a new maximal number of threads for @pool,
     721   *     or -1 for unlimited
     722   * @error: return location for error, or %NULL
     723   *
     724   * Sets the maximal allowed number of threads for @pool.
     725   * A value of -1 means that the maximal number of threads
     726   * is unlimited. If @pool is an exclusive thread pool, setting
     727   * the maximal number of threads to -1 is not allowed.
     728   *
     729   * Setting @max_threads to 0 means stopping all work for @pool.
     730   * It is effectively frozen until @max_threads is set to a non-zero
     731   * value again.
     732   *
     733   * A thread is never terminated while calling @func, as supplied by
     734   * g_thread_pool_new(). Instead the maximal number of threads only
     735   * has effect for the allocation of new threads in g_thread_pool_push().
     736   * A new thread is allocated, whenever the number of currently
     737   * running threads in @pool is smaller than the maximal number.
     738   *
     739   * @error can be %NULL to ignore errors, or non-%NULL to report
     740   * errors. An error can only occur when a new thread couldn't be
     741   * created.
     742   *
     743   * Before version 2.32, this function did not return a success status.
     744   *
     745   * Returns: %TRUE on success, %FALSE if an error occurred
     746   */
     747  gboolean
     748  g_thread_pool_set_max_threads (GThreadPool  *pool,
     749                                 gint          max_threads,
     750                                 GError      **error)
     751  {
     752    GRealThreadPool *real;
     753    gint to_start;
     754    gboolean result;
     755  
     756    real = (GRealThreadPool*) pool;
     757  
     758    g_return_val_if_fail (real, FALSE);
     759    g_return_val_if_fail (real->running, FALSE);
     760    g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE);
     761    g_return_val_if_fail (max_threads >= -1, FALSE);
     762  
     763    result = TRUE;
     764  
     765    g_async_queue_lock (real->queue);
     766  
     767    real->max_threads = max_threads;
     768  
     769    if (pool->exclusive)
     770      to_start = real->max_threads - real->num_threads;
     771    else
     772      to_start = g_async_queue_length_unlocked (real->queue);
     773  
     774    for ( ; to_start > 0; to_start--)
     775      {
     776        GError *local_error = NULL;
     777  
     778        if (!g_thread_pool_start_thread (real, &local_error))
     779          {
     780            g_propagate_error (error, local_error);
     781            result = FALSE;
     782            break;
     783          }
     784      }
     785  
     786    g_async_queue_unlock (real->queue);
     787  
     788    return result;
     789  }
     790  
     791  /**
     792   * g_thread_pool_get_max_threads:
     793   * @pool: a #GThreadPool
     794   *
     795   * Returns the maximal number of threads for @pool.
     796   *
     797   * Returns: the maximal number of threads
     798   */
     799  gint
     800  g_thread_pool_get_max_threads (GThreadPool *pool)
     801  {
     802    GRealThreadPool *real;
     803    gint retval;
     804  
     805    real = (GRealThreadPool*) pool;
     806  
     807    g_return_val_if_fail (real, 0);
     808    g_return_val_if_fail (real->running, 0);
     809  
     810    g_async_queue_lock (real->queue);
     811    retval = real->max_threads;
     812    g_async_queue_unlock (real->queue);
     813  
     814    return retval;
     815  }
     816  
     817  /**
     818   * g_thread_pool_get_num_threads:
     819   * @pool: a #GThreadPool
     820   *
     821   * Returns the number of threads currently running in @pool.
     822   *
     823   * Returns: the number of threads currently running
     824   */
     825  guint
     826  g_thread_pool_get_num_threads (GThreadPool *pool)
     827  {
     828    GRealThreadPool *real;
     829    guint retval;
     830  
     831    real = (GRealThreadPool*) pool;
     832  
     833    g_return_val_if_fail (real, 0);
     834    g_return_val_if_fail (real->running, 0);
     835  
     836    g_async_queue_lock (real->queue);
     837    retval = real->num_threads;
     838    g_async_queue_unlock (real->queue);
     839  
     840    return retval;
     841  }
     842  
     843  /**
     844   * g_thread_pool_unprocessed:
     845   * @pool: a #GThreadPool
     846   *
     847   * Returns the number of tasks still unprocessed in @pool.
     848   *
     849   * Returns: the number of unprocessed tasks
     850   */
     851  guint
     852  g_thread_pool_unprocessed (GThreadPool *pool)
     853  {
     854    GRealThreadPool *real;
     855    gint unprocessed;
     856  
     857    real = (GRealThreadPool*) pool;
     858  
     859    g_return_val_if_fail (real, 0);
     860    g_return_val_if_fail (real->running, 0);
     861  
     862    unprocessed = g_async_queue_length (real->queue);
     863  
     864    return MAX (unprocessed, 0);
     865  }
     866  
     867  /**
     868   * g_thread_pool_free:
     869   * @pool: a #GThreadPool
     870   * @immediate: should @pool shut down immediately?
     871   * @wait_: should the function wait for all tasks to be finished?
     872   *
     873   * Frees all resources allocated for @pool.
     874   *
     875   * If @immediate is %TRUE, no new task is processed for @pool.
     876   * Otherwise @pool is not freed before the last task is processed.
     877   * Note however, that no thread of this pool is interrupted while
     878   * processing a task. Instead at least all still running threads
     879   * can finish their tasks before the @pool is freed.
     880   *
     881   * If @wait_ is %TRUE, this function does not return before all
     882   * tasks to be processed (dependent on @immediate, whether all
     883   * or only the currently running) are ready.
     884   * Otherwise this function returns immediately.
     885   *
     886   * After calling this function @pool must not be used anymore.
     887   */
     888  void
     889  g_thread_pool_free (GThreadPool *pool,
     890                      gboolean     immediate,
     891                      gboolean     wait_)
     892  {
     893    GRealThreadPool *real;
     894  
     895    real = (GRealThreadPool*) pool;
     896  
     897    g_return_if_fail (real);
     898    g_return_if_fail (real->running);
     899  
     900    /* If there's no thread allowed here, there is not much sense in
     901     * not stopping this pool immediately, when it's not empty
     902     */
     903    g_return_if_fail (immediate ||
     904                      real->max_threads != 0 ||
     905                      g_async_queue_length (real->queue) == 0);
     906  
     907    g_async_queue_lock (real->queue);
     908  
     909    real->running = FALSE;
     910    real->immediate = immediate;
     911    real->waiting = wait_;
     912  
     913    if (wait_)
     914      {
     915        while (g_async_queue_length_unlocked (real->queue) != (gint) -real->num_threads &&
     916               !(immediate && real->num_threads == 0))
     917          g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue));
     918      }
     919  
     920    if (immediate || g_async_queue_length_unlocked (real->queue) == (gint) -real->num_threads)
     921      {
     922        /* No thread is currently doing something (and nothing is left
     923         * to process in the queue)
     924         */
     925        if (real->num_threads == 0)
     926          {
     927            /* No threads left, we clean up */
     928            g_async_queue_unlock (real->queue);
     929            g_thread_pool_free_internal (real);
     930            return;
     931          }
     932  
     933        g_thread_pool_wakeup_and_stop_all (real);
     934      }
     935  
     936    /* The last thread should cleanup the pool */
     937    real->waiting = FALSE;
     938    g_async_queue_unlock (real->queue);
     939  }
     940  
     941  static void
     942  g_thread_pool_free_internal (GRealThreadPool* pool)
     943  {
     944    g_return_if_fail (pool);
     945    g_return_if_fail (pool->running == FALSE);
     946    g_return_if_fail (pool->num_threads == 0);
     947  
     948    /* Ensure the dummy item pushed on by g_thread_pool_wakeup_and_stop_all() is
     949     * removed, before it’s potentially passed to the user-provided
     950     * @item_free_func. */
     951    g_async_queue_remove (pool->queue, GUINT_TO_POINTER (1));
     952  
     953    g_async_queue_unref (pool->queue);
     954    g_cond_clear (&pool->cond);
     955  
     956    g_free (pool);
     957  }
     958  
     959  static void
     960  g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool)
     961  {
     962    guint i;
     963  
     964    g_return_if_fail (pool);
     965    g_return_if_fail (pool->running == FALSE);
     966    g_return_if_fail (pool->num_threads != 0);
     967  
     968    pool->immediate = TRUE;
     969  
     970    /*
     971     * So here we're sending bogus data to the pool threads, which
     972     * should cause them each to wake up, and check the above
     973     * pool->immediate condition. However we don't want that
     974     * data to be sorted (since it'll crash the sorter).
     975     */
     976    for (i = 0; i < pool->num_threads; i++)
     977      g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1));
     978  }
     979  
     980  /**
     981   * g_thread_pool_set_max_unused_threads:
     982   * @max_threads: maximal number of unused threads
     983   *
     984   * Sets the maximal number of unused threads to @max_threads.
     985   * If @max_threads is -1, no limit is imposed on the number
     986   * of unused threads.
     987   *
     988   * The default value is 2.
     989   */
     990  void
     991  g_thread_pool_set_max_unused_threads (gint max_threads)
     992  {
     993    g_return_if_fail (max_threads >= -1);
     994  
     995    g_atomic_int_set (&max_unused_threads, max_threads);
     996  
     997    if (max_threads != -1)
     998      {
     999        max_threads -= g_atomic_int_get (&unused_threads);
    1000        if (max_threads < 0)
    1001          {
    1002            g_atomic_int_set (&kill_unused_threads, -max_threads);
    1003            g_atomic_int_inc (&wakeup_thread_serial);
    1004  
    1005            g_async_queue_lock (unused_thread_queue);
    1006  
    1007            do
    1008              {
    1009                g_async_queue_push_unlocked (unused_thread_queue,
    1010                                             wakeup_thread_marker);
    1011              }
    1012            while (++max_threads);
    1013  
    1014            g_async_queue_unlock (unused_thread_queue);
    1015          }
    1016      }
    1017  }
    1018  
    1019  /**
    1020   * g_thread_pool_get_max_unused_threads:
    1021   *
    1022   * Returns the maximal allowed number of unused threads.
    1023   *
    1024   * Returns: the maximal number of unused threads
    1025   */
    1026  gint
    1027  g_thread_pool_get_max_unused_threads (void)
    1028  {
    1029    return g_atomic_int_get (&max_unused_threads);
    1030  }
    1031  
    1032  /**
    1033   * g_thread_pool_get_num_unused_threads:
    1034   *
    1035   * Returns the number of currently unused threads.
    1036   *
    1037   * Returns: the number of currently unused threads
    1038   */
    1039  guint
    1040  g_thread_pool_get_num_unused_threads (void)
    1041  {
    1042    return (guint) g_atomic_int_get (&unused_threads);
    1043  }
    1044  
    1045  /**
    1046   * g_thread_pool_stop_unused_threads:
    1047   *
    1048   * Stops all currently unused threads. This does not change the
    1049   * maximal number of unused threads. This function can be used to
    1050   * regularly stop all unused threads e.g. from g_timeout_add().
    1051   */
    1052  void
    1053  g_thread_pool_stop_unused_threads (void)
    1054  {
    1055    guint oldval;
    1056  
    1057    oldval = g_thread_pool_get_max_unused_threads ();
    1058  
    1059    g_thread_pool_set_max_unused_threads (0);
    1060    g_thread_pool_set_max_unused_threads (oldval);
    1061  }
    1062  
    1063  /**
    1064   * g_thread_pool_set_sort_function:
    1065   * @pool: a #GThreadPool
    1066   * @func: the #GCompareDataFunc used to sort the list of tasks.
    1067   *     This function is passed two tasks. It should return
    1068   *     0 if the order in which they are handled does not matter,
    1069   *     a negative value if the first task should be processed before
    1070   *     the second or a positive value if the second task should be
    1071   *     processed first.
    1072   * @user_data: user data passed to @func
    1073   *
    1074   * Sets the function used to sort the list of tasks. This allows the
    1075   * tasks to be processed by a priority determined by @func, and not
    1076   * just in the order in which they were added to the pool.
    1077   *
    1078   * Note, if the maximum number of threads is more than 1, the order
    1079   * that threads are executed cannot be guaranteed 100%. Threads are
    1080   * scheduled by the operating system and are executed at random. It
    1081   * cannot be assumed that threads are executed in the order they are
    1082   * created.
    1083   *
    1084   * Since: 2.10
    1085   */
    1086  void
    1087  g_thread_pool_set_sort_function (GThreadPool      *pool,
    1088                                   GCompareDataFunc  func,
    1089                                   gpointer          user_data)
    1090  {
    1091    GRealThreadPool *real;
    1092  
    1093    real = (GRealThreadPool*) pool;
    1094  
    1095    g_return_if_fail (real);
    1096    g_return_if_fail (real->running);
    1097  
    1098    g_async_queue_lock (real->queue);
    1099  
    1100    real->sort_func = func;
    1101    real->sort_user_data = user_data;
    1102  
    1103    if (func)
    1104      g_async_queue_sort_unlocked (real->queue,
    1105                                   real->sort_func,
    1106                                   real->sort_user_data);
    1107  
    1108    g_async_queue_unlock (real->queue);
    1109  }
    1110  
    1111  /**
    1112   * g_thread_pool_move_to_front:
    1113   * @pool: a #GThreadPool
    1114   * @data: an unprocessed item in the pool
    1115   *
    1116   * Moves the item to the front of the queue of unprocessed
    1117   * items, so that it will be processed next.
    1118   *
    1119   * Returns: %TRUE if the item was found and moved
    1120   *
    1121   * Since: 2.46
    1122   */
    1123  gboolean
    1124  g_thread_pool_move_to_front (GThreadPool *pool,
    1125                               gpointer     data)
    1126  {
    1127    GRealThreadPool *real = (GRealThreadPool*) pool;
    1128    gboolean found;
    1129  
    1130    g_async_queue_lock (real->queue);
    1131  
    1132    found = g_async_queue_remove_unlocked (real->queue, data);
    1133    if (found)
    1134      g_async_queue_push_front_unlocked (real->queue, data);
    1135  
    1136    g_async_queue_unlock (real->queue);
    1137  
    1138    return found;
    1139  }
    1140  
    1141  /**
    1142   * g_thread_pool_set_max_idle_time:
    1143   * @interval: the maximum @interval (in milliseconds)
    1144   *     a thread can be idle
    1145   *
    1146   * This function will set the maximum @interval that a thread
    1147   * waiting in the pool for new tasks can be idle for before
    1148   * being stopped. This function is similar to calling
    1149   * g_thread_pool_stop_unused_threads() on a regular timeout,
    1150   * except this is done on a per thread basis.
    1151   *
    1152   * By setting @interval to 0, idle threads will not be stopped.
    1153   *
    1154   * The default value is 15000 (15 seconds).
    1155   *
    1156   * Since: 2.10
    1157   */
    1158  void
    1159  g_thread_pool_set_max_idle_time (guint interval)
    1160  {
    1161    guint i;
    1162  
    1163    g_atomic_int_set (&max_idle_time, interval);
    1164  
    1165    i = (guint) g_atomic_int_get (&unused_threads);
    1166    if (i > 0)
    1167      {
    1168        g_atomic_int_inc (&wakeup_thread_serial);
    1169        g_async_queue_lock (unused_thread_queue);
    1170  
    1171        do
    1172          {
    1173            g_async_queue_push_unlocked (unused_thread_queue,
    1174                                         wakeup_thread_marker);
    1175          }
    1176        while (--i);
    1177  
    1178        g_async_queue_unlock (unused_thread_queue);
    1179      }
    1180  }
    1181  
    1182  /**
    1183   * g_thread_pool_get_max_idle_time:
    1184   *
    1185   * This function will return the maximum @interval that a
    1186   * thread will wait in the thread pool for new tasks before
    1187   * being stopped.
    1188   *
    1189   * If this function returns 0, threads waiting in the thread
    1190   * pool for new work are not stopped.
    1191   *
    1192   * Returns: the maximum @interval (milliseconds) to wait
    1193   *     for new tasks in the thread pool before stopping the
    1194   *     thread
    1195   *
    1196   * Since: 2.10
    1197   */
    1198  guint
    1199  g_thread_pool_get_max_idle_time (void)
    1200  {
    1201    return (guint) g_atomic_int_get (&max_idle_time);
    1202  }