• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "gcontextspecificgroup.c"
2 #include <gio/gio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 
6 #define N_THREADS 10
7 
8 static gchar *test_file;
9 
10 char *test_file_buffer;
11 gsize test_file_size;
12 static char async_read_buffer[8192];
13 
14 static void
read_data(GObject * source,GAsyncResult * result,gpointer loop)15 read_data (GObject *source, GAsyncResult *result, gpointer loop)
16 {
17   GInputStream *in = G_INPUT_STREAM (source);
18   GError *error = NULL;
19   gssize nread;
20 
21   nread = g_input_stream_read_finish (in, result, &error);
22   g_assert_no_error (error);
23 
24   g_assert_cmpint (nread, >, 0);
25   g_assert_cmpint (nread, <=, MIN(sizeof (async_read_buffer), test_file_size));
26   g_assert (memcmp (async_read_buffer, test_file_buffer, nread) == 0);
27 
28   g_main_loop_quit (loop);
29 }
30 
31 static void
opened_for_read(GObject * source,GAsyncResult * result,gpointer loop)32 opened_for_read (GObject *source, GAsyncResult *result, gpointer loop)
33 {
34   GFile *file = G_FILE (source);
35   GFileInputStream *in;
36   GError *error = NULL;
37 
38   in = g_file_read_finish (file, result, &error);
39   g_assert_no_error (error);
40 
41   memset (async_read_buffer, 0, sizeof (async_read_buffer));
42   g_input_stream_read_async (G_INPUT_STREAM (in),
43 			     async_read_buffer, sizeof (async_read_buffer),
44 			     G_PRIORITY_DEFAULT, NULL,
45 			     read_data, loop);
46 
47   g_object_unref (in);
48 }
49 
50 /* Test 1: Async I/O started in a thread with a thread-default context
51  * will stick to that thread, and will complete even if the default
52  * main loop is blocked. (NB: the last part would not be true if we
53  * were testing GFileMonitor!)
54  */
55 
56 static gboolean idle_start_test1_thread (gpointer loop);
57 static gpointer test1_thread (gpointer user_data);
58 
59 static gboolean test1_done;
60 static GCond test1_cond;
61 static GMutex test1_mutex;
62 
63 static void
test_thread_independence(void)64 test_thread_independence (void)
65 {
66   GMainLoop *loop;
67 
68   loop = g_main_loop_new (NULL, FALSE);
69   g_idle_add (idle_start_test1_thread, loop);
70   g_main_loop_run (loop);
71   g_main_loop_unref (loop);
72 }
73 
74 static gboolean
idle_start_test1_thread(gpointer loop)75 idle_start_test1_thread (gpointer loop)
76 {
77   gint64 time;
78   GThread *thread;
79   gboolean io_completed;
80 
81   g_mutex_lock (&test1_mutex);
82   thread = g_thread_new ("test1", test1_thread, NULL);
83 
84   time = g_get_monotonic_time () + 2 * G_TIME_SPAN_SECOND;
85   while (!test1_done)
86     {
87       io_completed = g_cond_wait_until (&test1_cond, &test1_mutex, time);
88       g_assert (io_completed);
89     }
90   g_thread_join (thread);
91 
92   g_mutex_unlock (&test1_mutex);
93   g_main_loop_quit (loop);
94   return G_SOURCE_REMOVE;
95 }
96 
97 static gpointer
test1_thread(gpointer user_data)98 test1_thread (gpointer user_data)
99 {
100   GMainContext *context;
101   GMainLoop *loop;
102   GFile *file;
103 
104   /* Wait for main thread to be waiting on test1_cond */
105   g_mutex_lock (&test1_mutex);
106 
107   context = g_main_context_new ();
108   g_assert (g_main_context_get_thread_default () == NULL);
109   g_main_context_push_thread_default (context);
110   g_assert (g_main_context_get_thread_default () == context);
111 
112   file = g_file_new_for_path (test_file);
113   g_assert (g_file_supports_thread_contexts (file));
114 
115   loop = g_main_loop_new (context, FALSE);
116   g_file_read_async (file, G_PRIORITY_DEFAULT, NULL,
117 		     opened_for_read, loop);
118   g_object_unref (file);
119   g_main_loop_run (loop);
120   g_main_loop_unref (loop);
121 
122   test1_done = TRUE;
123   g_cond_signal (&test1_cond);
124   g_mutex_unlock (&test1_mutex);
125 
126   g_main_context_pop_thread_default (context);
127   g_main_context_unref (context);
128 
129   return NULL;
130 }
131 
132 /* Test 2: If we push a thread-default context in the main thread, we
133  * can run async ops in that context without running the default
134  * context.
135  */
136 
137 static gboolean test2_fail (gpointer user_data);
138 
139 static void
test_context_independence(void)140 test_context_independence (void)
141 {
142   GMainContext *context;
143   GMainLoop *loop;
144   GFile *file;
145   guint default_timeout;
146   GSource *thread_default_timeout;
147 
148   context = g_main_context_new ();
149   g_assert (g_main_context_get_thread_default () == NULL);
150   g_main_context_push_thread_default (context);
151   g_assert (g_main_context_get_thread_default () == context);
152 
153   file = g_file_new_for_path (test_file);
154   g_assert (g_file_supports_thread_contexts (file));
155 
156   /* Add a timeout to the main loop, to fail immediately if it gets run */
157   default_timeout = g_timeout_add_full (G_PRIORITY_HIGH, 0,
158 					test2_fail, NULL, NULL);
159   /* Add a timeout to the alternate loop, to fail if the I/O *doesn't* run */
160   thread_default_timeout = g_timeout_source_new_seconds (2);
161   g_source_set_callback (thread_default_timeout, test2_fail, NULL, NULL);
162   g_source_attach (thread_default_timeout, context);
163 
164   loop = g_main_loop_new (context, FALSE);
165   g_file_read_async (file, G_PRIORITY_DEFAULT, NULL,
166 		     opened_for_read, loop);
167   g_object_unref (file);
168   g_main_loop_run (loop);
169   g_main_loop_unref (loop);
170 
171   g_source_remove (default_timeout);
172   g_source_destroy (thread_default_timeout);
173   g_source_unref (thread_default_timeout);
174 
175   g_main_context_pop_thread_default (context);
176   g_main_context_unref (context);
177 }
178 
179 static gboolean
test2_fail(gpointer user_data)180 test2_fail (gpointer user_data)
181 {
182   g_assert_not_reached ();
183   return FALSE;
184 }
185 
186 
187 typedef struct
188 {
189   GObject parent_instance;
190 
191   GMainContext *context;
192 } PerThreadThing;
193 
194 typedef GObjectClass PerThreadThingClass;
195 
196 static GType per_thread_thing_get_type (void);
197 
198 G_DEFINE_TYPE (PerThreadThing, per_thread_thing, G_TYPE_OBJECT)
199 
200 static GContextSpecificGroup group;
201 static gpointer instances[N_THREADS];
202 static gint is_running;
203 static gint current_value;
204 static gint observed_values[N_THREADS];
205 
206 static void
start_func(void)207 start_func (void)
208 {
209   g_assert (!is_running);
210   g_atomic_int_set (&is_running, TRUE);
211 }
212 
213 static void
stop_func(void)214 stop_func (void)
215 {
216   g_assert (is_running);
217   g_atomic_int_set (&is_running, FALSE);
218 }
219 
220 static void
per_thread_thing_finalize(GObject * object)221 per_thread_thing_finalize (GObject *object)
222 {
223   PerThreadThing *thing = (PerThreadThing *) object;
224 
225   g_context_specific_group_remove (&group, thing->context, thing, stop_func);
226 
227   G_OBJECT_CLASS (per_thread_thing_parent_class)->finalize (object);
228 }
229 
230 static void
per_thread_thing_init(PerThreadThing * thing)231 per_thread_thing_init (PerThreadThing *thing)
232 {
233 }
234 
235 static void
per_thread_thing_class_init(PerThreadThingClass * class)236 per_thread_thing_class_init (PerThreadThingClass *class)
237 {
238   class->finalize = per_thread_thing_finalize;
239 
240   g_signal_new ("changed", per_thread_thing_get_type (), G_SIGNAL_RUN_FIRST, 0,
241                 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
242 }
243 
244 static gpointer
per_thread_thing_get(void)245 per_thread_thing_get (void)
246 {
247   return g_context_specific_group_get (&group, per_thread_thing_get_type (),
248                                        G_STRUCT_OFFSET (PerThreadThing, context),
249                                        start_func);
250 }
251 
252 static gpointer
test_identity_thread(gpointer user_data)253 test_identity_thread (gpointer user_data)
254 {
255   guint thread_nr = GPOINTER_TO_UINT (user_data);
256   GMainContext *my_context;
257   guint i, j;
258 
259   my_context = g_main_context_new ();
260   g_main_context_push_thread_default (my_context);
261 
262   g_assert (!instances[thread_nr]);
263   instances[thread_nr] = per_thread_thing_get ();
264   g_assert (g_atomic_int_get (&is_running));
265 
266   for (i = 0; i < 100; i++)
267     {
268       gpointer instance = per_thread_thing_get ();
269 
270       for (j = 0; j < N_THREADS; j++)
271         g_assert ((instance == instances[j]) == (thread_nr == j));
272 
273       g_assert (g_atomic_int_get (&is_running));
274 
275       g_thread_yield ();
276 
277       g_assert (g_atomic_int_get (&is_running));
278     }
279 
280   for (i = 0; i < 100; i++)
281     {
282       g_object_unref (instances[thread_nr]);
283 
284       for (j = 0; j < N_THREADS; j++)
285         g_assert ((instances[thread_nr] == instances[j]) == (thread_nr == j));
286 
287       g_assert (g_atomic_int_get (&is_running));
288 
289       g_thread_yield ();
290     }
291 
292   /* drop the last ref */
293   g_object_unref (instances[thread_nr]);
294   instances[thread_nr] = NULL;
295 
296   g_main_context_pop_thread_default (my_context);
297   g_main_context_unref (my_context);
298 
299   /* at least one thread should see this cleared on exit */
300   return GUINT_TO_POINTER (!group.requested_state);
301 }
302 
303 static void
test_context_specific_identity(void)304 test_context_specific_identity (void)
305 {
306   GThread *threads[N_THREADS];
307   gboolean exited = FALSE;
308   guint i;
309 
310   g_assert (!g_atomic_int_get (&is_running));
311   for (i = 0; i < N_THREADS; i++)
312     threads[i] = g_thread_new ("test", test_identity_thread, GUINT_TO_POINTER (i));
313   for (i = 0; i < N_THREADS; i++)
314     exited |= GPOINTER_TO_UINT (g_thread_join (threads[i]));
315   g_assert (exited);
316   g_assert (!group.requested_state);
317 }
318 
319 static void
changed_emitted(PerThreadThing * thing,gpointer user_data)320 changed_emitted (PerThreadThing *thing,
321                  gpointer        user_data)
322 {
323   gint *observed_value = user_data;
324 
325   g_atomic_int_set (observed_value, g_atomic_int_get (&current_value));
326 }
327 
328 static gpointer
test_emit_thread(gpointer user_data)329 test_emit_thread (gpointer user_data)
330 {
331   gint *observed_value = user_data;
332   GMainContext *my_context;
333   gpointer instance;
334 
335   my_context = g_main_context_new ();
336   g_main_context_push_thread_default (my_context);
337 
338   instance = per_thread_thing_get ();
339   g_assert (g_atomic_int_get (&is_running));
340 
341   g_signal_connect (instance, "changed", G_CALLBACK (changed_emitted), observed_value);
342 
343   /* observe after connection */
344   g_atomic_int_set (observed_value, g_atomic_int_get (&current_value));
345 
346   while (g_atomic_int_get (&current_value) != -1)
347     g_main_context_iteration (my_context, TRUE);
348 
349   g_object_unref (instance);
350 
351   g_main_context_pop_thread_default (my_context);
352   g_main_context_unref (my_context);
353 
354   /* at least one thread should see this cleared on exit */
355   return GUINT_TO_POINTER (!group.requested_state);
356 }
357 
358 static void
test_context_specific_emit(void)359 test_context_specific_emit (void)
360 {
361   GThread *threads[N_THREADS];
362   gboolean exited = FALSE;
363   guint i, n;
364 
365   for (i = 0; i < N_THREADS; i++)
366     threads[i] = g_thread_new ("test", test_emit_thread, &observed_values[i]);
367 
368   /* make changes and ensure that they are observed */
369   for (n = 0; n < 1000; n++)
370     {
371       guint64 expiry;
372 
373       /* don't burn CPU forever */
374       expiry = g_get_monotonic_time () + 10 * G_TIME_SPAN_SECOND;
375 
376       g_atomic_int_set (&current_value, n);
377 
378       /* wake them to notice */
379       for (i = 0; i < g_test_rand_int_range (1, 5); i++)
380         g_context_specific_group_emit (&group, g_signal_lookup ("changed", per_thread_thing_get_type ()));
381 
382       for (i = 0; i < N_THREADS; i++)
383         while (g_atomic_int_get (&observed_values[i]) != n)
384           {
385             g_thread_yield ();
386 
387             if (g_get_monotonic_time () > expiry)
388               g_error ("timed out");
389           }
390     }
391 
392   /* tell them to quit */
393   g_atomic_int_set (&current_value, -1);
394   g_context_specific_group_emit (&group, g_signal_lookup ("notify", G_TYPE_OBJECT));
395 
396   for (i = 0; i < N_THREADS; i++)
397     exited |= GPOINTER_TO_UINT (g_thread_join (threads[i]));
398   g_assert (exited);
399   g_assert (!group.requested_state);
400 }
401 
402 static void
test_context_specific_emit_and_unref(void)403 test_context_specific_emit_and_unref (void)
404 {
405   gpointer obj;
406 
407   obj = per_thread_thing_get ();
408   g_context_specific_group_emit (&group, g_signal_lookup ("changed", per_thread_thing_get_type ()));
409   g_object_unref (obj);
410 
411   while (g_main_context_iteration (NULL, 0))
412     ;
413 }
414 
415 int
main(int argc,char ** argv)416 main (int argc, char **argv)
417 {
418   GError *error = NULL;
419   int ret;
420 
421   g_test_init (&argc, &argv, NULL);
422 
423   test_file = g_test_build_filename (G_TEST_DIST, "contexts.c", NULL);
424   g_file_get_contents (test_file, &test_file_buffer,
425 		       &test_file_size, &error);
426   g_assert_no_error (error);
427 
428   g_test_add_func ("/gio/contexts/thread-independence", test_thread_independence);
429   g_test_add_func ("/gio/contexts/context-independence", test_context_independence);
430   g_test_add_func ("/gio/contexts/context-specific/identity", test_context_specific_identity);
431   g_test_add_func ("/gio/contexts/context-specific/emit", test_context_specific_emit);
432   g_test_add_func ("/gio/contexts/context-specific/emit-and-unref", test_context_specific_emit_and_unref);
433 
434   ret = g_test_run();
435 
436   g_free (test_file_buffer);
437   g_free (test_file);
438 
439   return ret;
440 }
441