1 #include "config.h"
2
3 #include <glib.h>
4
5 #define WAIT 5 /* seconds */
6 #define MAX_THREADS 10
7
8 /* if > 0 the test will run continuously (since the test ends when
9 * thread count is 0), if -1 it means no limit to unused threads
10 * if 0 then no unused threads are possible */
11 #define MAX_UNUSED_THREADS -1
12
13 G_LOCK_DEFINE_STATIC (thread_counter_pools);
14
15 static gulong abs_thread_counter = 0;
16 static gulong running_thread_counter = 0;
17 static gulong leftover_task_counter = 0;
18
19 static GThreadPool *idle_pool = NULL;
20
21 static void
22 test_threadpool_functions (void)
23 {
24 gint max_unused_threads;
25 guint max_idle_time;
26
27 /* This function attempts to call functions which don't need a
28 * threadpool to operate to make sure no uninitialised pointers
29 * accessed and no errors occur.
30 */
31
32 max_unused_threads = 3;
33
34 g_thread_pool_set_max_unused_threads (max_unused_threads);
35
36 g_assert_cmpint (g_thread_pool_get_max_unused_threads (), ==,
37 max_unused_threads);
38
39 g_assert_cmpint (g_thread_pool_get_num_unused_threads (), ==, 0);
40
41 g_thread_pool_stop_unused_threads ();
42
43 max_idle_time = 10 * G_USEC_PER_SEC;
44
45 g_thread_pool_set_max_idle_time (max_idle_time);
46
47 g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, max_idle_time);
48
49 g_thread_pool_set_max_idle_time (0);
50
51 g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, 0);
52 }
53
54 static void
55 test_threadpool_stop_unused (void)
56 {
57 GThreadPool *pool;
58 guint i;
59 guint limit = 100;
60
61 /* Spawn a few threads. */
62 g_thread_pool_set_max_unused_threads (-1);
63 pool = g_thread_pool_new ((GFunc) g_usleep, NULL, -1, FALSE, NULL);
64
65 for (i = 0; i < limit; i++)
66 g_thread_pool_push (pool, GUINT_TO_POINTER (1000), NULL);
67
68 /* Wait for the threads to migrate. */
69 while (g_thread_pool_get_num_threads (pool) != 0)
70 g_usleep (100);
71
72 g_assert_cmpuint (g_thread_pool_get_num_threads (pool), ==, 0);
73 g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), >, 0);
74
75 /* Wait for threads to die. */
76 g_thread_pool_stop_unused_threads ();
77
78 while (g_thread_pool_get_num_unused_threads () != 0)
79 g_usleep (100);
80
81 g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
82
83 g_thread_pool_set_max_unused_threads (MAX_THREADS);
84
85 g_assert_cmpuint (g_thread_pool_get_num_threads (pool), ==, 0);
86 g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
87
88 g_thread_pool_free (pool, FALSE, TRUE);
89 }
90
91 static void
92 test_threadpool_stop_unused_multiple (void)
93 {
94 GThreadPool *pools[10];
95 guint i, j;
96 const guint limit = 10;
97 gboolean all_stopped;
98
99 /* Spawn a few threads. */
100 g_thread_pool_set_max_unused_threads (-1);
101
102 for (i = 0; i < G_N_ELEMENTS (pools); i++)
103 {
104 pools[i] = g_thread_pool_new ((GFunc) g_usleep, NULL, -1, FALSE, NULL);
105
106 for (j = 0; j < limit; j++)
107 g_thread_pool_push (pools[i], GUINT_TO_POINTER (100), NULL);
108 }
109
110 all_stopped = FALSE;
111 while (!all_stopped)
112 {
113 all_stopped = TRUE;
114 for (i = 0; i < G_N_ELEMENTS (pools); i++)
115 all_stopped &= (g_thread_pool_get_num_threads (pools[i]) == 0);
116 }
117
118 for (i = 0; i < G_N_ELEMENTS (pools); i++)
119 {
120 g_assert_cmpuint (g_thread_pool_get_num_threads (pools[i]), ==, 0);
121 g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), >, 0);
122 }
123
124 /* Wait for threads to die. */
125 g_thread_pool_stop_unused_threads ();
126
127 while (g_thread_pool_get_num_unused_threads () != 0)
128 g_usleep (100);
129
130 g_assert_cmpuint (g_thread_pool_get_num_unused_threads (), ==, 0);
131
132 for (i = 0; i < G_N_ELEMENTS (pools); i++)
133 g_thread_pool_free (pools[i], FALSE, TRUE);
134 }
135
136 static void
137 test_threadpool_pools_entry_func (gpointer data, gpointer user_data)
138 {
139 G_LOCK (thread_counter_pools);
140 abs_thread_counter++;
141 running_thread_counter++;
142 G_UNLOCK (thread_counter_pools);
143
144 g_usleep (g_random_int_range (0, 4000));
145
146 G_LOCK (thread_counter_pools);
147 running_thread_counter--;
148 leftover_task_counter--;
149
150 G_UNLOCK (thread_counter_pools);
151 }
152
153 static void
154 test_threadpool_pools (void)
155 {
156 GThreadPool *pool1, *pool2, *pool3;
157 guint runs;
158 guint i;
159
160 pool1 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 3, FALSE, NULL);
161 pool2 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 5, TRUE, NULL);
162 pool3 = g_thread_pool_new ((GFunc) test_threadpool_pools_entry_func, NULL, 7, TRUE, NULL);
163
164 runs = 300;
165 for (i = 0; i < runs; i++)
166 {
167 g_thread_pool_push (pool1, GUINT_TO_POINTER (i + 1), NULL);
168 g_thread_pool_push (pool2, GUINT_TO_POINTER (i + 1), NULL);
169 g_thread_pool_push (pool3, GUINT_TO_POINTER (i + 1), NULL);
170
171 G_LOCK (thread_counter_pools);
172 leftover_task_counter += 3;
173 G_UNLOCK (thread_counter_pools);
174 }
175
176 g_thread_pool_free (pool1, TRUE, TRUE);
177 g_thread_pool_free (pool2, FALSE, TRUE);
178 g_thread_pool_free (pool3, FALSE, TRUE);
179
180 g_assert_cmpint (runs * 3, ==, abs_thread_counter + leftover_task_counter);
181 g_assert_cmpint (running_thread_counter, ==, 0);
182 }
183
184 static gint
185 test_threadpool_sort_compare_func (gconstpointer a, gconstpointer b, gpointer user_data)
186 {
187 guint32 id1, id2;
188
189 id1 = GPOINTER_TO_UINT (a);
190 id2 = GPOINTER_TO_UINT (b);
191
192 return (id1 > id2 ? +1 : id1 == id2 ? 0 : -1);
193 }
194
195 static void
196 test_threadpool_sort_entry_func (gpointer data, gpointer user_data)
197 {
198 guint thread_id;
199 gboolean is_sorted;
200 static GMutex last_thread_mutex;
201 static guint last_thread_id = 0;
202
203 g_mutex_lock (&last_thread_mutex);
204
205 thread_id = GPOINTER_TO_UINT (data);
206 is_sorted = GPOINTER_TO_INT (user_data);
207
208 if (is_sorted)
209 {
210 if (last_thread_id != 0)
211 g_assert_cmpint (last_thread_id, <=, thread_id);
212
213 last_thread_id = thread_id;
214 }
215
216 g_mutex_unlock (&last_thread_mutex);
217
218 g_usleep (WAIT * 1000);
219 }
220
221 static void
222 test_threadpool_sort (gconstpointer data)
223 {
224 gboolean sort = GPOINTER_TO_UINT (data);
225 GThreadPool *pool;
226 guint limit;
227 guint max_threads;
228 guint i;
229 GError *local_error = NULL;
230
231 limit = MAX_THREADS * 10;
232
233 if (sort) {
234 max_threads = 1;
235 } else {
236 max_threads = MAX_THREADS;
237 }
238
239 /* It is important that we only have a maximum of 1 thread for this
240 * test since the results can not be guaranteed to be sorted if > 1.
241 *
242 * Threads are scheduled by the operating system and are executed at
243 * random. It cannot be assumed that threads are executed in the
244 * order they are created. This was discussed in bug #334943.
245 *
246 * However, if testing sorting, we start with max-threads=0 so that all the
247 * work can be enqueued before starting the pool. This prevents a race between
248 * new work being enqueued out of sorted order, and work being pulled off the
249 * queue.
250 */
251
252 pool = g_thread_pool_new (test_threadpool_sort_entry_func,
253 GINT_TO_POINTER (sort),
254 sort ? 0 : max_threads,
255 FALSE,
256 NULL);
257
258 g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
259
260 if (sort) {
261 g_thread_pool_set_sort_function (pool,
262 test_threadpool_sort_compare_func,
263 NULL);
264 }
265
266 for (i = 0; i < limit; i++) {
267 guint id;
268
269 id = g_random_int_range (1, limit) + 1;
270 g_thread_pool_push (pool, GUINT_TO_POINTER (id), NULL);
271 g_test_message ("%s ===> pushed new thread with id:%d, number "
272 "of threads:%d, unprocessed:%d",
273 sort ? "[ sorted]" : "[unsorted]",
274 id,
275 g_thread_pool_get_num_threads (pool),
276 g_thread_pool_unprocessed (pool));
277 }
278
279 if (sort)
280 {
281 g_test_message ("Starting thread pool processing");
282 g_thread_pool_set_max_threads (pool, max_threads, &local_error);
283 g_assert_no_error (local_error);
284 }
285
286 g_assert_cmpint (g_thread_pool_get_max_threads (pool), ==, (gint) max_threads);
287 g_assert_cmpuint (g_thread_pool_get_num_threads (pool), <=,
288 (guint) g_thread_pool_get_max_threads (pool));
289 g_thread_pool_free (pool, TRUE, TRUE);
290 }
291
292 static void
293 test_threadpool_idle_time_entry_func (gpointer data, gpointer user_data)
294 {
295 g_usleep (WAIT * 1000);
296 }
297
298 static gboolean
299 test_threadpool_idle_timeout (gpointer data)
300 {
301 gboolean *idle_timeout_called = data;
302 gint i;
303
304 *idle_timeout_called = TRUE;
305
306 for (i = 0; i < 2; i++) {
307 g_thread_pool_push (idle_pool, GUINT_TO_POINTER (100 + i), NULL);
308 }
309
310 g_main_context_wakeup (NULL);
311
312 return G_SOURCE_REMOVE;
313 }
314
315 static gboolean
316 poll_cb (gpointer data)
317 {
318 g_main_context_wakeup (NULL);
319 return G_SOURCE_CONTINUE;
320 }
321
322 static void
323 test_threadpool_idle_time (void)
324 {
325 guint limit = 50;
326 guint interval = 10000;
327 guint i;
328 guint idle;
329 gboolean idle_timeout_called = FALSE;
330 GSource *timeout_source = NULL;
331 GSource *poll_source = NULL;
332
333 idle_pool = g_thread_pool_new (test_threadpool_idle_time_entry_func,
334 NULL,
335 0,
336 FALSE,
337 NULL);
338
339 g_thread_pool_set_max_threads (idle_pool, MAX_THREADS, NULL);
340 g_thread_pool_set_max_unused_threads (MAX_UNUSED_THREADS);
341 g_thread_pool_set_max_idle_time (interval);
342
343 g_assert_cmpint (g_thread_pool_get_max_threads (idle_pool), ==,
344 MAX_THREADS);
345 g_assert_cmpint (g_thread_pool_get_max_unused_threads (), ==,
346 MAX_UNUSED_THREADS);
347 g_assert_cmpint (g_thread_pool_get_max_idle_time (), ==, interval);
348
349 for (i = 0; i < limit; i++) {
350 g_thread_pool_push (idle_pool, GUINT_TO_POINTER (i + 1), NULL);
351 }
352
353 g_assert_cmpint (g_thread_pool_unprocessed (idle_pool), <=, limit);
354
355 timeout_source = g_timeout_source_new (interval - 1000);
356 g_source_set_callback (timeout_source, test_threadpool_idle_timeout, &idle_timeout_called, NULL);
357 g_source_attach (timeout_source, NULL);
358
359 /* Wait until the idle timeout has been called at least once and there are no
360 * unused threads. We need a second timeout for this, to periodically wake
361 * the main context up, as there’s no way to be notified of changes to `idle`.
362 */
363 poll_source = g_timeout_source_new (500);
364 g_source_set_callback (poll_source, poll_cb, NULL, NULL);
365 g_source_attach (poll_source, NULL);
366
367 idle = g_thread_pool_get_num_unused_threads ();
368 while (!idle_timeout_called || idle > 0)
369 {
370 g_test_message ("Pool idle thread count: %d, unprocessed jobs: %d",
371 idle, g_thread_pool_unprocessed (idle_pool));
372 g_main_context_iteration (NULL, TRUE);
373 idle = g_thread_pool_get_num_unused_threads ();
374 }
375
376 g_thread_pool_free (idle_pool, FALSE, TRUE);
377 g_source_destroy (poll_source);
378 g_source_unref (poll_source);
379 g_source_destroy (timeout_source);
380 g_source_unref (timeout_source);
381 }
382
383 int
384 main (int argc, char *argv[])
385 {
386 g_test_init (&argc, &argv, NULL);
387
388 g_test_add_func ("/threadpool/functions", test_threadpool_functions);
389 g_test_add_func ("/threadpool/stop-unused", test_threadpool_stop_unused);
390 g_test_add_func ("/threadpool/pools", test_threadpool_pools);
391 g_test_add_data_func ("/threadpool/no-sort", GUINT_TO_POINTER (FALSE), test_threadpool_sort);
392 g_test_add_data_func ("/threadpool/sort", GUINT_TO_POINTER (TRUE), test_threadpool_sort);
393 g_test_add_func ("/threadpool/stop-unused-multiple", test_threadpool_stop_unused_multiple);
394 g_test_add_func ("/threadpool/idle-time", test_threadpool_idle_time);
395
396 return g_test_run ();
397 }