(root)/
glib-2.79.0/
glib/
tests/
thread-pool-slow.c
       1  #include "config.h"
       2  
       3  #include <glib.h>
       4  
       5  #define WAIT                5    /* seconds */
       6  #define MAX_THREADS         10
       7  
       8  /* if > 0 the test will run continuously (since the test ends when
       9   * thread count is 0), if -1 it means no limit to unused threads
      10   * if 0 then no unused threads are possible */
      11  #define MAX_UNUSED_THREADS -1
      12  
      13  G_LOCK_DEFINE_STATIC (thread_counter_pools);
      14  
      15  static gulong abs_thread_counter = 0;
      16  static gulong running_thread_counter = 0;
      17  static gulong leftover_task_counter = 0;
      18  
      19  static GThreadPool *idle_pool = NULL;
      20  
      21  static void
      22  test_threadpool_functions (void)
      23  {
      24    gint max_unused_threads;
      25    guint max_idle_time;
      26  
      27    /* This function attempts to call functions which don't need a
      28     * threadpool to operate to make sure no uninitialised pointers
      29     * accessed and no errors occur.
      30     */
      31  
      32    max_unused_threads = 3;
      33  
      34    g_thread_pool_set_max_unused_threads (max_unused_threads);
      35  
      36    g_assert_cmpint (g_thread_pool_get_max_unused_threads (), ==,
      37                     max_unused_threads);
      38  
      39    g_assert_cmpint (g_thread_pool_get_num_unused_threads (), ==, 0);
      40  
      41    g_thread_pool_stop_unused_threads ();
      42  
      43    max_idle_time = 10 * G_USEC_PER_SEC;
      44  
      45    g_thread_pool_set_max_idle_time (max_idle_time);
      46  
      47    g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, max_idle_time);
      48  
      49    g_thread_pool_set_max_idle_time (0);
      50  
      51    g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, 0);
      52  }
      53  
      54  static void
      55  test_threadpool_stop_unused (void)
      56  {
      57    GThreadPool *pool;
      58    guint i;
      59    guint limit = 100;
      60  
      61    /* Spawn a few threads. */
      62    g_thread_pool_set_max_unused_threads (-1);
      63    pool = g_thread_pool_new ((GFunc) g_usleep, NULL, -1, FALSE, NULL);
      64  
      65    for (i = 0; i < limit; i++)
      66      g_thread_pool_push (pool, GUINT_TO_POINTER (1000), NULL);
      67  
      68    /* Wait for the threads to migrate. */
      69    while (g_thread_pool_get_num_threads (pool) != 0)
      70      g_usleep (100);
      71  
      72    g_assert_cmpuint (g_thread_pool_get_num_threads (pool), ==, 0);
      73    g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), >, 0);
      74  
      75    /* Wait for threads to die. */
      76    g_thread_pool_stop_unused_threads ();
      77  
      78    while (g_thread_pool_get_num_unused_threads () != 0)
      79      g_usleep (100);
      80  
      81    g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
      82  
      83    g_thread_pool_set_max_unused_threads (MAX_THREADS);
      84  
      85    g_assert_cmpuint (g_thread_pool_get_num_threads (pool), ==, 0);
      86    g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
      87  
      88    g_thread_pool_free (pool, FALSE, TRUE);
      89  }
      90  
      91  static void
      92  test_threadpool_stop_unused_multiple (void)
      93  {
      94    GThreadPool *pools[10];
      95    guint i, j;
      96    const guint limit = 10;
      97    gboolean all_stopped;
      98  
      99    /* Spawn a few threads. */
     100    g_thread_pool_set_max_unused_threads (-1);
     101  
     102    for (i = 0; i < G_N_ELEMENTS (pools); i++)
     103      {
     104        pools[i] = g_thread_pool_new ((GFunc) g_usleep, NULL, -1, FALSE, NULL);
     105  
     106        for (j = 0; j < limit; j++)
     107          g_thread_pool_push (pools[i], GUINT_TO_POINTER (100), NULL);
     108      }
     109  
     110    all_stopped = FALSE;
     111    while (!all_stopped)
     112      {
     113        all_stopped = TRUE;
     114        for (i = 0; i < G_N_ELEMENTS (pools); i++)
     115          all_stopped &= (g_thread_pool_get_num_threads (pools[i]) == 0);
     116      }
     117  
     118    for (i = 0; i < G_N_ELEMENTS (pools); i++)
     119      {
     120        g_assert_cmpuint (g_thread_pool_get_num_threads (pools[i]), ==, 0);
     121        g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), >, 0);
     122      }
     123  
     124    /* Wait for threads to die. */
     125    g_thread_pool_stop_unused_threads ();
     126  
     127    while (g_thread_pool_get_num_unused_threads () != 0)
     128      g_usleep (100);
     129  
     130    g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
     131  
     132    for (i = 0; i < G_N_ELEMENTS (pools); i++)
     133      g_thread_pool_free (pools[i], FALSE, TRUE);
     134  }
     135  
     136  static void
     137  test_threadpool_pools_entry_func (gpointer data, gpointer user_data)
     138  {
     139    G_LOCK (thread_counter_pools);
     140    abs_thread_counter++;
     141    running_thread_counter++;
     142    G_UNLOCK (thread_counter_pools);
     143  
     144    g_usleep (g_random_int_range (0, 4000));
     145  
     146    G_LOCK (thread_counter_pools);
     147    running_thread_counter--;
     148    leftover_task_counter--;
     149  
     150    G_UNLOCK (thread_counter_pools);
     151  }
     152  
     153  static void
     154  test_threadpool_pools (void)
     155  {
     156    GThreadPool *pool1, *pool2, *pool3;
     157    guint runs;
     158    guint i;
     159  
     160    pool1 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 3, FALSE, NULL);
     161    pool2 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 5, TRUE, NULL);
     162    pool3 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 7, TRUE, NULL);
     163  
     164    runs = 300;
     165    for (i = 0; i < runs; i++)
     166      {
     167        g_thread_pool_push (pool1, GUINT_TO_POINTER (i + 1), NULL);
     168        g_thread_pool_push (pool2, GUINT_TO_POINTER (i + 1), NULL);
     169        g_thread_pool_push (pool3, GUINT_TO_POINTER (i + 1), NULL);
     170  
     171        G_LOCK (thread_counter_pools);
     172        leftover_task_counter += 3;
     173        G_UNLOCK (thread_counter_pools);
     174      }
     175  
     176    g_thread_pool_free (pool1, TRUE, TRUE);
     177    g_thread_pool_free (pool2, FALSE, TRUE);
     178    g_thread_pool_free (pool3, FALSE, TRUE);
     179  
     180    g_assert_cmpint (runs * 3, ==, abs_thread_counter + leftover_task_counter);
     181    g_assert_cmpint (running_thread_counter, ==, 0);
     182  }
     183  
     184  static gint
     185  test_threadpool_sort_compare_func (gconstpointer a, gconstpointer b, gpointer user_data)
     186  {
     187    guint32 id1, id2;
     188  
     189    id1 = GPOINTER_TO_UINT (a);
     190    id2 = GPOINTER_TO_UINT (b);
     191  
     192    return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
     193  }
     194  
     195  static void
     196  test_threadpool_sort_entry_func (gpointer data, gpointer user_data)
     197  {
     198    guint thread_id;
     199    gboolean is_sorted;
     200    static GMutex last_thread_mutex;
     201    static guint last_thread_id = 0;
     202  
     203    g_mutex_lock (&last_thread_mutex);
     204  
     205    thread_id = GPOINTER_TO_UINT (data);
     206    is_sorted = GPOINTER_TO_INT (user_data);
     207  
     208    if (is_sorted)
     209      {
     210        if (last_thread_id != 0)
     211          g_assert_cmpint (last_thread_id, <=, thread_id);
     212  
     213        last_thread_id = thread_id;
     214      }
     215  
     216    g_mutex_unlock (&last_thread_mutex);
     217  
     218    g_usleep (WAIT * 1000);
     219  }
     220  
     221  static void
     222  test_threadpool_sort (gconstpointer data)
     223  {
     224    gboolean sort = GPOINTER_TO_UINT (data);
     225    GThreadPool *pool;
     226    guint limit;
     227    guint max_threads;
     228    guint i;
     229    GError *local_error = NULL;
     230  
     231    limit = MAX_THREADS * 10;
     232  
     233    if (sort) {
     234      max_threads = 1;
     235    } else {
     236      max_threads = MAX_THREADS;
     237    }
     238  
     239    /* It is important that we only have a maximum of 1 thread for this
     240     * test since the results can not be guaranteed to be sorted if > 1.
     241     *
     242     * Threads are scheduled by the operating system and are executed at
     243     * random. It cannot be assumed that threads are executed in the
     244     * order they are created. This was discussed in bug #334943.
     245     *
     246     * However, if testing sorting, we start with max-threads=0 so that all the
     247     * work can be enqueued before starting the pool. This prevents a race between
     248     * new work being enqueued out of sorted order, and work being pulled off the
     249     * queue.
     250     */
     251  
     252    pool = g_thread_pool_new (test_threadpool_sort_entry_func,
     253  			    GINT_TO_POINTER (sort),
     254  			    sort ? 0 : max_threads,
     255  			    FALSE,
     256  			    NULL);
     257  
     258    g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
     259  
     260    if (sort) {
     261      g_thread_pool_set_sort_function (pool,
     262  				     test_threadpool_sort_compare_func,
     263  				     NULL);
     264    }
     265  
     266    for (i = 0; i < limit; i++) {
     267      guint id;
     268  
     269      id = g_random_int_range (1, limit) + 1;
     270      g_thread_pool_push (pool, GUINT_TO_POINTER (id), NULL);
     271      g_test_message ("%s ===> pushed new thread with id:%d, number "
     272                      "of threads:%d, unprocessed:%d",
     273                      sort ? "[  sorted]" : "[unsorted]",
     274                      id,
     275                      g_thread_pool_get_num_threads (pool),
     276                      g_thread_pool_unprocessed (pool));
     277    }
     278  
     279    if (sort)
     280      {
     281        g_test_message ("Starting thread pool processing");
     282        g_thread_pool_set_max_threads (pool, max_threads, &local_error);
     283        g_assert_no_error (local_error);
     284      }
     285  
     286    g_assert_cmpint (g_thread_pool_get_max_threads (pool), ==, (gint) max_threads);
     287    g_assert_cmpuint (g_thread_pool_get_num_threads (pool), <=,
     288                      (guint) g_thread_pool_get_max_threads (pool));
     289    g_thread_pool_free (pool, TRUE, TRUE);
     290  }
     291  
     292  static void
     293  test_threadpool_idle_time_entry_func (gpointer data, gpointer user_data)
     294  {
     295    g_usleep (WAIT * 1000);
     296  }
     297  
     298  static gboolean
     299  test_threadpool_idle_timeout (gpointer data)
     300  {
     301    gboolean *idle_timeout_called = data;
     302    gint i;
     303  
     304    *idle_timeout_called = TRUE;
     305  
     306    for (i = 0; i < 2; i++) {
     307      g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL);
     308    }
     309  
     310    g_main_context_wakeup (NULL);
     311  
     312    return G_SOURCE_REMOVE;
     313  }
     314  
     315  static gboolean
     316  poll_cb (gpointer data)
     317  {
     318    g_main_context_wakeup (NULL);
     319    return G_SOURCE_CONTINUE;
     320  }
     321  
     322  static void
     323  test_threadpool_idle_time (void)
     324  {
     325    guint limit = 50;
     326    guint interval = 10000;
     327    guint i;
     328    guint idle;
     329    gboolean idle_timeout_called = FALSE;
     330    GSource *timeout_source = NULL;
     331    GSource *poll_source = NULL;
     332  
     333    idle_pool = g_thread_pool_new (test_threadpool_idle_time_entry_func,
     334  				 NULL,
     335  				 0,
     336  				 FALSE,
     337  				 NULL);
     338  
     339    g_thread_pool_set_max_threads (idle_pool, MAX_THREADS, NULL);
     340    g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
     341    g_thread_pool_set_max_idle_time (interval);
     342  
     343    g_assert_cmpint (g_thread_pool_get_max_threads (idle_pool), ==,
     344                     MAX_THREADS);
     345    g_assert_cmpint (g_thread_pool_get_max_unused_threads (), ==,
     346                     MAX_UNUSED_THREADS);
     347    g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, interval);
     348  
     349    for (i = 0; i < limit; i++) {
     350      g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i + 1), NULL);
     351    }
     352  
     353    g_assert_cmpint (g_thread_pool_unprocessed (idle_pool), <=, limit);
     354  
     355    timeout_source = g_timeout_source_new (interval - 1000);
     356    g_source_set_callback (timeout_source, test_threadpool_idle_timeout, &idle_timeout_called, NULL);
     357    g_source_attach (timeout_source, NULL);
     358  
     359    /* Wait until the idle timeout has been called at least once and there are no
     360     * unused threads. We need a second timeout for this, to periodically wake
     361     * the main context up, as there’s no way to be notified of changes to `idle`.
     362     */
     363    poll_source = g_timeout_source_new (500);
     364    g_source_set_callback (poll_source, poll_cb, NULL, NULL);
     365    g_source_attach (poll_source, NULL);
     366  
     367    idle = g_thread_pool_get_num_unused_threads ();
     368    while (!idle_timeout_called || idle > 0)
     369      {
     370        g_test_message ("Pool idle thread count: %d, unprocessed jobs: %d",
     371                        idle, g_thread_pool_unprocessed (idle_pool));
     372        g_main_context_iteration (NULL, TRUE);
     373        idle = g_thread_pool_get_num_unused_threads ();
     374      }
     375  
     376    g_thread_pool_free (idle_pool, FALSE, TRUE);
     377    g_source_destroy (poll_source);
     378    g_source_unref (poll_source);
     379    g_source_destroy (timeout_source);
     380    g_source_unref (timeout_source);
     381  }
     382  
     383  int
     384  main (int argc, char *argv[])
     385  {
     386    g_test_init (&argc, &argv, NULL);
     387  
     388    g_test_add_func ("/threadpool/functions", test_threadpool_functions);
     389    g_test_add_func ("/threadpool/stop-unused", test_threadpool_stop_unused);
     390    g_test_add_func ("/threadpool/pools", test_threadpool_pools);
     391    g_test_add_data_func ("/threadpool/no-sort", GUINT_TO_POINTER (FALSE), test_threadpool_sort);
     392    g_test_add_data_func ("/threadpool/sort", GUINT_TO_POINTER (TRUE), test_threadpool_sort);
     393    g_test_add_func ("/threadpool/stop-unused-multiple", test_threadpool_stop_unused_multiple);
     394    g_test_add_func ("/threadpool/idle-time", test_threadpool_idle_time);
     395  
     396    return g_test_run ();
     397  }