1 #include <glib.h>
2 #include <glib/gwakeup.h>
3
4 static gboolean
5 check_signaled (GWakeup *wakeup)
6 {
7 GPollFD fd;
8
9 g_wakeup_get_pollfd (wakeup, &fd);
10 return g_poll (&fd, 1, 0);
11 }
12
13 static void
14 wait_for_signaled (GWakeup *wakeup)
15 {
16 GPollFD fd;
17
18 g_wakeup_get_pollfd (wakeup, &fd);
19 g_poll (&fd, 1, -1);
20 }
21
22 static void
23 test_semantics (void)
24 {
25 GWakeup *wakeup;
26 gint i;
27
28 wakeup = g_wakeup_new ();
29 g_assert (!check_signaled (wakeup));
30
31 g_wakeup_signal (wakeup);
32 g_assert (check_signaled (wakeup));
33
34 g_wakeup_acknowledge (wakeup);
35 g_assert (!check_signaled (wakeup));
36
37 g_wakeup_free (wakeup);
38
39 /* free unused */
40 wakeup = g_wakeup_new ();
41 g_wakeup_free (wakeup);
42
43 /* free while signaled */
44 wakeup = g_wakeup_new ();
45 g_wakeup_signal (wakeup);
46 g_wakeup_free (wakeup);
47
48 /* ensure excessive signalling doesn't deadlock */
49 wakeup = g_wakeup_new ();
50 for (i = 0; i < 1000000; i++)
51 g_wakeup_signal (wakeup);
52 g_assert (check_signaled (wakeup));
53
54 /* ensure a single acknowledgement is sufficient */
55 g_wakeup_acknowledge (wakeup);
56 g_assert (!check_signaled (wakeup));
57
58 g_wakeup_free (wakeup);
59 }
60
61 struct token
62 {
63 gpointer owner;
64 gint ttl;
65 };
66
67 struct context
68 {
69 GSList *pending_tokens;
70 GMutex lock;
71 GWakeup *wakeup;
72 gboolean quit;
73 };
74
75 #define NUM_THREADS 50
76 #define NUM_TOKENS 5
77 #define TOKEN_TTL 100000
78
79 static struct context contexts[NUM_THREADS];
80 static GThread *threads[NUM_THREADS];
81 static GWakeup *last_token_wakeup;
82 static gint tokens_alive; /* (atomic) */
83
84 static void
85 context_init (struct context *ctx)
86 {
87 ctx->pending_tokens = NULL;
88 g_mutex_init (&ctx->lock);
89 ctx->wakeup = g_wakeup_new ();
90 ctx->quit = FALSE;
91 }
92
93 static void
94 context_clear (struct context *ctx)
95 {
96 g_assert (ctx->pending_tokens == NULL);
97 g_assert (ctx->quit);
98
99 g_mutex_clear (&ctx->lock);
100 g_wakeup_free (ctx->wakeup);
101 }
102
103 static void
104 context_quit (struct context *ctx)
105 {
106 g_atomic_int_set (&ctx->quit, TRUE);
107 g_wakeup_signal (ctx->wakeup);
108 }
109
110 static struct token *
111 context_try_pop_token (struct context *ctx)
112 {
113 struct token *token = NULL;
114
115 g_mutex_lock (&ctx->lock);
116 if (ctx->pending_tokens != NULL)
117 {
118 token = ctx->pending_tokens->data;
119 ctx->pending_tokens = g_slist_delete_link (ctx->pending_tokens,
120 ctx->pending_tokens);
121 }
122 g_mutex_unlock (&ctx->lock);
123
124 return token;
125 }
126
127 static void
128 context_push_token (struct context *ctx,
129 struct token *token)
130 {
131 g_assert (token->owner == ctx);
132
133 g_mutex_lock (&ctx->lock);
134 ctx->pending_tokens = g_slist_prepend (ctx->pending_tokens, token);
135 g_mutex_unlock (&ctx->lock);
136
137 g_wakeup_signal (ctx->wakeup);
138 }
139
140 static void
141 dispatch_token (struct token *token)
142 {
143 if (token->ttl > 0)
144 {
145 struct context *ctx;
146 gint next_ctx;
147
148 next_ctx = g_test_rand_int_range (0, NUM_THREADS);
149 ctx = &contexts[next_ctx];
150 token->owner = ctx;
151 token->ttl--;
152
153 context_push_token (ctx, token);
154 }
155 else
156 {
157 g_slice_free (struct token, token);
158
159 if (g_atomic_int_dec_and_test (&tokens_alive))
160 g_wakeup_signal (last_token_wakeup);
161 }
162 }
163
164 static struct token *
165 token_new (int ttl)
166 {
167 struct token *token;
168
169 token = g_slice_new (struct token);
170 token->ttl = ttl;
171
172 g_atomic_int_inc (&tokens_alive);
173
174 return token;
175 }
176
177 static gpointer
178 thread_func (gpointer data)
179 {
180 struct context *ctx = data;
181 struct token *token;
182
183 while (!g_atomic_int_get (&ctx->quit))
184 {
185 wait_for_signaled (ctx->wakeup);
186 g_wakeup_acknowledge (ctx->wakeup);
187
188 while ((token = context_try_pop_token (ctx)) != NULL)
189 {
190 g_assert (token->owner == ctx);
191 dispatch_token (token);
192 }
193 }
194
195 return NULL;
196 }
197
198 static void
199 test_threaded (void)
200 {
201 gint i;
202
203 /* simple mainloop test based on GWakeup.
204 *
205 * create a bunch of contexts and a thread to 'run' each one. create
206 * some tokens and randomly pass them between the threads, until the
207 * TTL on each token is zero.
208 *
209 * when no tokens are left, signal that we are done. the mainthread
210 * will then signal each worker thread to exit and join them to make
211 * sure that works.
212 */
213
214 last_token_wakeup = g_wakeup_new ();
215
216 /* create contexts, assign to threads */
217 for (i = 0; i < NUM_THREADS; i++)
218 {
219 context_init (&contexts[i]);
220 threads[i] = g_thread_new ("test", thread_func, &contexts[i]);
221 }
222
223 /* dispatch tokens */
224 for (i = 0; i < NUM_TOKENS; i++)
225 dispatch_token (token_new (TOKEN_TTL));
226
227 /* wait until all tokens are gone */
228 wait_for_signaled (last_token_wakeup);
229
230 /* ask threads to quit, join them, cleanup */
231 for (i = 0; i < NUM_THREADS; i++)
232 {
233 context_quit (&contexts[i]);
234 g_thread_join (threads[i]);
235 context_clear (&contexts[i]);
236 }
237
238 g_wakeup_free (last_token_wakeup);
239 }
240
241 int
242 main (int argc, char **argv)
243 {
244 g_test_init (&argc, &argv, NULL);
245
246 #ifdef TEST_EVENTFD_FALLBACK
247 #define TESTNAME_SUFFIX "-fallback"
248 #else
249 #define TESTNAME_SUFFIX
250 #endif
251
252
253 g_test_add_func ("/gwakeup/semantics" TESTNAME_SUFFIX, test_semantics);
254 g_test_add_func ("/gwakeup/threaded" TESTNAME_SUFFIX, test_threaded);
255
256 return g_test_run ();
257 }