1 /* GDBus - GLib D-Bus Library
2 *
3 * Copyright (C) 2008-2010 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 *
18 * Author: David Zeuthen <davidz@redhat.com>
19 */
20
21 #include "config.h"
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "giotypes.h"
27 #include "gioenumtypes.h"
28 #include "gsocket.h"
29 #include "gdbusauthobserver.h"
30 #include "gdbusprivate.h"
31 #include "gdbusmessage.h"
32 #include "gdbusconnection.h"
33 #include "gdbusproxy.h"
34 #include "gdbuserror.h"
35 #include "gdbusintrospection.h"
36 #include "gdbusdaemon.h"
37 #include "giomodule-priv.h"
38 #include "gtask.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "glib/gstdio.h"
43 #include "gsocketaddress.h"
44 #include "gsocketcontrolmessage.h"
45 #include "gsocketconnection.h"
46 #include "gsocketoutputstream.h"
47
48 #ifdef G_OS_UNIX
49 #include "gunixfdmessage.h"
50 #include "gunixconnection.h"
51 #include "gunixcredentialsmessage.h"
52 #endif
53
54 #ifdef G_OS_WIN32
55 #include <windows.h>
56 #include <io.h>
57 #include <conio.h>
58 #endif
59
60 #include "glibintl.h"
61
62 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
63 static void schedule_pending_close (GDBusWorker *worker);
64
65 /* ---------------------------------------------------------------------------------------------------- */
66
67 gchar *
_g_dbus_hexdump(const gchar * data,gsize len,guint indent)68 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
69 {
70 guint n, m;
71 GString *ret;
72
73 ret = g_string_new (NULL);
74
75 for (n = 0; n < len; n += 16)
76 {
77 g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
78
79 for (m = n; m < n + 16; m++)
80 {
81 if (m > n && (m%4) == 0)
82 g_string_append_c (ret, ' ');
83 if (m < len)
84 g_string_append_printf (ret, "%02x ", (guchar) data[m]);
85 else
86 g_string_append (ret, " ");
87 }
88
89 g_string_append (ret, " ");
90
91 for (m = n; m < len && m < n + 16; m++)
92 g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
93
94 g_string_append_c (ret, '\n');
95 }
96
97 return g_string_free (ret, FALSE);
98 }
99
100 /* ---------------------------------------------------------------------------------------------------- */
101
102 /* Unfortunately ancillary messages are discarded when reading from a
103 * socket using the GSocketInputStream abstraction. So we provide a
104 * very GInputStream-ish API that uses GSocket in this case (very
105 * similar to GSocketInputStream).
106 */
107
108 typedef struct
109 {
110 void *buffer;
111 gsize count;
112
113 GSocketControlMessage ***messages;
114 gint *num_messages;
115 } ReadWithControlData;
116
117 static void
read_with_control_data_free(ReadWithControlData * data)118 read_with_control_data_free (ReadWithControlData *data)
119 {
120 g_slice_free (ReadWithControlData, data);
121 }
122
123 static gboolean
_g_socket_read_with_control_messages_ready(GSocket * socket,GIOCondition condition,gpointer user_data)124 _g_socket_read_with_control_messages_ready (GSocket *socket,
125 GIOCondition condition,
126 gpointer user_data)
127 {
128 GTask *task = user_data;
129 ReadWithControlData *data = g_task_get_task_data (task);
130 GError *error;
131 gssize result;
132 GInputVector vector;
133
134 error = NULL;
135 vector.buffer = data->buffer;
136 vector.size = data->count;
137 result = g_socket_receive_message (socket,
138 NULL, /* address */
139 &vector,
140 1,
141 data->messages,
142 data->num_messages,
143 NULL,
144 g_task_get_cancellable (task),
145 &error);
146
147 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
148 {
149 g_error_free (error);
150 return TRUE;
151 }
152
153 g_assert (result >= 0 || error != NULL);
154 if (result >= 0)
155 g_task_return_int (task, result);
156 else
157 g_task_return_error (task, error);
158 g_object_unref (task);
159
160 return FALSE;
161 }
162
163 static void
_g_socket_read_with_control_messages(GSocket * socket,void * buffer,gsize count,GSocketControlMessage *** messages,gint * num_messages,gint io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)164 _g_socket_read_with_control_messages (GSocket *socket,
165 void *buffer,
166 gsize count,
167 GSocketControlMessage ***messages,
168 gint *num_messages,
169 gint io_priority,
170 GCancellable *cancellable,
171 GAsyncReadyCallback callback,
172 gpointer user_data)
173 {
174 GTask *task;
175 ReadWithControlData *data;
176 GSource *source;
177
178 data = g_slice_new0 (ReadWithControlData);
179 data->buffer = buffer;
180 data->count = count;
181 data->messages = messages;
182 data->num_messages = num_messages;
183
184 task = g_task_new (socket, cancellable, callback, user_data);
185 g_task_set_source_tag (task, _g_socket_read_with_control_messages);
186 g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free);
187
188 if (g_socket_condition_check (socket, G_IO_IN))
189 {
190 if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task))
191 return;
192 }
193
194 source = g_socket_create_source (socket,
195 G_IO_IN | G_IO_HUP | G_IO_ERR,
196 cancellable);
197 g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready);
198 g_source_unref (source);
199 }
200
201 static gssize
_g_socket_read_with_control_messages_finish(GSocket * socket,GAsyncResult * result,GError ** error)202 _g_socket_read_with_control_messages_finish (GSocket *socket,
203 GAsyncResult *result,
204 GError **error)
205 {
206 g_return_val_if_fail (G_IS_SOCKET (socket), -1);
207 g_return_val_if_fail (g_task_is_valid (result, socket), -1);
208
209 return g_task_propagate_int (G_TASK (result), error);
210 }
211
212 /* ---------------------------------------------------------------------------------------------------- */
213
214 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=674885
215 and see also the original https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
216
217 static GPtrArray *ensured_classes = NULL;
218
219 static void
ensure_type(GType gtype)220 ensure_type (GType gtype)
221 {
222 g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
223 }
224
225 static void
release_required_types(void)226 release_required_types (void)
227 {
228 g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
229 g_ptr_array_unref (ensured_classes);
230 ensured_classes = NULL;
231 }
232
233 static void
ensure_required_types(void)234 ensure_required_types (void)
235 {
236 g_assert (ensured_classes == NULL);
237 ensured_classes = g_ptr_array_new ();
238 /* Generally in this list, you should initialize types which are used as
239 * properties first, then the class which has them. For example, GDBusProxy
240 * has a type of GDBusConnection, so we initialize GDBusConnection first.
241 * And because GDBusConnection has a property of type GDBusConnectionFlags,
242 * we initialize that first.
243 *
244 * Similarly, GSocket has a type of GSocketAddress.
245 *
246 * We don't fill out the whole dependency tree right now because in practice
247 * it tends to be just types that GDBus use that cause pain, and there
248 * is work on a more general approach in https://bugzilla.gnome.org/show_bug.cgi?id=674885
249 */
250 ensure_type (G_TYPE_TASK);
251 ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
252 ensure_type (G_TYPE_DBUS_CONNECTION_FLAGS);
253 ensure_type (G_TYPE_DBUS_CAPABILITY_FLAGS);
254 ensure_type (G_TYPE_DBUS_AUTH_OBSERVER);
255 ensure_type (G_TYPE_DBUS_CONNECTION);
256 ensure_type (G_TYPE_DBUS_PROXY);
257 ensure_type (G_TYPE_SOCKET_FAMILY);
258 ensure_type (G_TYPE_SOCKET_TYPE);
259 ensure_type (G_TYPE_SOCKET_PROTOCOL);
260 ensure_type (G_TYPE_SOCKET_ADDRESS);
261 ensure_type (G_TYPE_SOCKET);
262 }
263 /* ---------------------------------------------------------------------------------------------------- */
264
265 typedef struct
266 {
267 volatile gint refcount;
268 GThread *thread;
269 GMainContext *context;
270 GMainLoop *loop;
271 } SharedThreadData;
272
273 static gpointer
gdbus_shared_thread_func(gpointer user_data)274 gdbus_shared_thread_func (gpointer user_data)
275 {
276 SharedThreadData *data = user_data;
277
278 g_main_context_push_thread_default (data->context);
279 g_main_loop_run (data->loop);
280 g_main_context_pop_thread_default (data->context);
281
282 release_required_types ();
283
284 return NULL;
285 }
286
287 /* ---------------------------------------------------------------------------------------------------- */
288
289 static SharedThreadData *
_g_dbus_shared_thread_ref(void)290 _g_dbus_shared_thread_ref (void)
291 {
292 static gsize shared_thread_data = 0;
293 SharedThreadData *ret;
294
295 if (g_once_init_enter (&shared_thread_data))
296 {
297 SharedThreadData *data;
298
299 data = g_new0 (SharedThreadData, 1);
300 data->refcount = 0;
301
302 data->context = g_main_context_new ();
303 data->loop = g_main_loop_new (data->context, FALSE);
304 data->thread = g_thread_new ("gdbus",
305 gdbus_shared_thread_func,
306 data);
307 /* We can cast between gsize and gpointer safely */
308 g_once_init_leave (&shared_thread_data, (gsize) data);
309 }
310
311 ret = (SharedThreadData*) shared_thread_data;
312 g_atomic_int_inc (&ret->refcount);
313 return ret;
314 }
315
316 static void
_g_dbus_shared_thread_unref(SharedThreadData * data)317 _g_dbus_shared_thread_unref (SharedThreadData *data)
318 {
319 /* TODO: actually destroy the shared thread here */
320 #if 0
321 g_assert (data != NULL);
322 if (g_atomic_int_dec_and_test (&data->refcount))
323 {
324 g_main_loop_quit (data->loop);
325 //g_thread_join (data->thread);
326 g_main_loop_unref (data->loop);
327 g_main_context_unref (data->context);
328 }
329 #endif
330 }
331
332 /* ---------------------------------------------------------------------------------------------------- */
333
334 typedef enum {
335 PENDING_NONE = 0,
336 PENDING_WRITE,
337 PENDING_FLUSH,
338 PENDING_CLOSE
339 } OutputPending;
340
341 struct GDBusWorker
342 {
343 volatile gint ref_count;
344
345 SharedThreadData *shared_thread_data;
346
347 /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
348 volatile gint stopped;
349
350 /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
351 * only affects messages received from the other peer (since GDBusServer is the
352 * only user) - we might want it to affect messages sent to the other peer too?
353 */
354 gboolean frozen;
355 GDBusCapabilityFlags capabilities;
356 GQueue *received_messages_while_frozen;
357
358 GIOStream *stream;
359 GCancellable *cancellable;
360 GDBusWorkerMessageReceivedCallback message_received_callback;
361 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
362 GDBusWorkerDisconnectedCallback disconnected_callback;
363 gpointer user_data;
364
365 /* if not NULL, stream is GSocketConnection */
366 GSocket *socket;
367
368 /* used for reading */
369 GMutex read_lock;
370 gchar *read_buffer;
371 gsize read_buffer_allocated_size;
372 gsize read_buffer_cur_size;
373 gsize read_buffer_bytes_wanted;
374 GUnixFDList *read_fd_list;
375 GSocketControlMessage **read_ancillary_messages;
376 gint read_num_ancillary_messages;
377
378 /* Whether an async write, flush or close, or none of those, is pending.
379 * Only the worker thread may change its value, and only with the write_lock.
380 * Other threads may read its value when holding the write_lock.
381 * The worker thread may read its value at any time.
382 */
383 OutputPending output_pending;
384 /* used for writing */
385 GMutex write_lock;
386 /* queue of MessageToWriteData, protected by write_lock */
387 GQueue *write_queue;
388 /* protected by write_lock */
389 guint64 write_num_messages_written;
390 /* number of messages we'd written out last time we flushed;
391 * protected by write_lock
392 */
393 guint64 write_num_messages_flushed;
394 /* list of FlushData, protected by write_lock */
395 GList *write_pending_flushes;
396 /* list of CloseData, protected by write_lock */
397 GList *pending_close_attempts;
398 /* no lock - only used from the worker thread */
399 gboolean close_expected;
400 };
401
402 static void _g_dbus_worker_unref (GDBusWorker *worker);
403
404 /* ---------------------------------------------------------------------------------------------------- */
405
406 typedef struct
407 {
408 GMutex mutex;
409 GCond cond;
410 guint64 number_to_wait_for;
411 gboolean finished;
412 GError *error;
413 } FlushData;
414
415 struct _MessageToWriteData ;
416 typedef struct _MessageToWriteData MessageToWriteData;
417
418 static void message_to_write_data_free (MessageToWriteData *data);
419
420 static void read_message_print_transport_debug (gssize bytes_read,
421 GDBusWorker *worker);
422
423 static void write_message_print_transport_debug (gssize bytes_written,
424 MessageToWriteData *data);
425
426 typedef struct {
427 GDBusWorker *worker;
428 GTask *task;
429 } CloseData;
430
close_data_free(CloseData * close_data)431 static void close_data_free (CloseData *close_data)
432 {
433 g_clear_object (&close_data->task);
434
435 _g_dbus_worker_unref (close_data->worker);
436 g_slice_free (CloseData, close_data);
437 }
438
439 /* ---------------------------------------------------------------------------------------------------- */
440
441 static GDBusWorker *
_g_dbus_worker_ref(GDBusWorker * worker)442 _g_dbus_worker_ref (GDBusWorker *worker)
443 {
444 g_atomic_int_inc (&worker->ref_count);
445 return worker;
446 }
447
448 static void
_g_dbus_worker_unref(GDBusWorker * worker)449 _g_dbus_worker_unref (GDBusWorker *worker)
450 {
451 if (g_atomic_int_dec_and_test (&worker->ref_count))
452 {
453 g_assert (worker->write_pending_flushes == NULL);
454
455 _g_dbus_shared_thread_unref (worker->shared_thread_data);
456
457 g_object_unref (worker->stream);
458
459 g_mutex_clear (&worker->read_lock);
460 g_object_unref (worker->cancellable);
461 if (worker->read_fd_list != NULL)
462 g_object_unref (worker->read_fd_list);
463
464 g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
465 g_mutex_clear (&worker->write_lock);
466 g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
467 g_free (worker->read_buffer);
468
469 g_free (worker);
470 }
471 }
472
473 static void
_g_dbus_worker_emit_disconnected(GDBusWorker * worker,gboolean remote_peer_vanished,GError * error)474 _g_dbus_worker_emit_disconnected (GDBusWorker *worker,
475 gboolean remote_peer_vanished,
476 GError *error)
477 {
478 if (!g_atomic_int_get (&worker->stopped))
479 worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
480 }
481
482 static void
_g_dbus_worker_emit_message_received(GDBusWorker * worker,GDBusMessage * message)483 _g_dbus_worker_emit_message_received (GDBusWorker *worker,
484 GDBusMessage *message)
485 {
486 if (!g_atomic_int_get (&worker->stopped))
487 worker->message_received_callback (worker, message, worker->user_data);
488 }
489
490 static GDBusMessage *
_g_dbus_worker_emit_message_about_to_be_sent(GDBusWorker * worker,GDBusMessage * message)491 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
492 GDBusMessage *message)
493 {
494 GDBusMessage *ret;
495 if (!g_atomic_int_get (&worker->stopped))
496 ret = worker->message_about_to_be_sent_callback (worker, g_steal_pointer (&message), worker->user_data);
497 else
498 ret = g_steal_pointer (&message);
499 return ret;
500 }
501
502 /* can only be called from private thread with read-lock held - takes ownership of @message */
503 static void
_g_dbus_worker_queue_or_deliver_received_message(GDBusWorker * worker,GDBusMessage * message)504 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
505 GDBusMessage *message)
506 {
507 if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
508 {
509 /* queue up */
510 g_queue_push_tail (worker->received_messages_while_frozen, g_steal_pointer (&message));
511 }
512 else
513 {
514 /* not frozen, nor anything in queue */
515 _g_dbus_worker_emit_message_received (worker, message);
516 g_clear_object (&message);
517 }
518 }
519
520 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
521 static gboolean
unfreeze_in_idle_cb(gpointer user_data)522 unfreeze_in_idle_cb (gpointer user_data)
523 {
524 GDBusWorker *worker = user_data;
525 GDBusMessage *message;
526
527 g_mutex_lock (&worker->read_lock);
528 if (worker->frozen)
529 {
530 while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
531 {
532 _g_dbus_worker_emit_message_received (worker, message);
533 g_clear_object (&message);
534 }
535 worker->frozen = FALSE;
536 }
537 else
538 {
539 g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
540 }
541 g_mutex_unlock (&worker->read_lock);
542 return FALSE;
543 }
544
545 /* can be called from any thread */
546 void
_g_dbus_worker_unfreeze(GDBusWorker * worker)547 _g_dbus_worker_unfreeze (GDBusWorker *worker)
548 {
549 GSource *idle_source;
550 idle_source = g_idle_source_new ();
551 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
552 g_source_set_callback (idle_source,
553 unfreeze_in_idle_cb,
554 _g_dbus_worker_ref (worker),
555 (GDestroyNotify) _g_dbus_worker_unref);
556 g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
557 g_source_attach (idle_source, worker->shared_thread_data->context);
558 g_source_unref (idle_source);
559 }
560
561 /* ---------------------------------------------------------------------------------------------------- */
562
563 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
564
565 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
566 static void
_g_dbus_worker_do_read_cb(GInputStream * input_stream,GAsyncResult * res,gpointer user_data)567 _g_dbus_worker_do_read_cb (GInputStream *input_stream,
568 GAsyncResult *res,
569 gpointer user_data)
570 {
571 GDBusWorker *worker = user_data;
572 GError *error;
573 gssize bytes_read;
574
575 g_mutex_lock (&worker->read_lock);
576
577 /* If already stopped, don't even process the reply */
578 if (g_atomic_int_get (&worker->stopped))
579 goto out;
580
581 error = NULL;
582 if (worker->socket == NULL)
583 bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
584 res,
585 &error);
586 else
587 bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
588 res,
589 &error);
590 if (worker->read_num_ancillary_messages > 0)
591 {
592 gint n;
593 for (n = 0; n < worker->read_num_ancillary_messages; n++)
594 {
595 GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
596
597 if (FALSE)
598 {
599 }
600 #ifdef G_OS_UNIX
601 else if (G_IS_UNIX_FD_MESSAGE (control_message))
602 {
603 GUnixFDMessage *fd_message;
604 gint *fds;
605 gint num_fds;
606
607 fd_message = G_UNIX_FD_MESSAGE (control_message);
608 fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
609 if (worker->read_fd_list == NULL)
610 {
611 worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
612 }
613 else
614 {
615 gint n;
616 for (n = 0; n < num_fds; n++)
617 {
618 /* TODO: really want a append_steal() */
619 g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
620 (void) g_close (fds[n], NULL);
621 }
622 }
623 g_free (fds);
624 }
625 else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
626 {
627 /* do nothing */
628 }
629 #endif
630 else
631 {
632 if (error == NULL)
633 {
634 g_set_error (&error,
635 G_IO_ERROR,
636 G_IO_ERROR_FAILED,
637 "Unexpected ancillary message of type %s received from peer",
638 g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
639 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
640 g_error_free (error);
641 g_object_unref (control_message);
642 n++;
643 while (n < worker->read_num_ancillary_messages)
644 g_object_unref (worker->read_ancillary_messages[n++]);
645 g_free (worker->read_ancillary_messages);
646 goto out;
647 }
648 }
649 g_object_unref (control_message);
650 }
651 g_free (worker->read_ancillary_messages);
652 }
653
654 if (bytes_read == -1)
655 {
656 if (G_UNLIKELY (_g_dbus_debug_transport ()))
657 {
658 _g_dbus_debug_print_lock ();
659 g_print ("========================================================================\n"
660 "GDBus-debug:Transport:\n"
661 " ---- READ ERROR on stream of type %s:\n"
662 " ---- %s %d: %s\n",
663 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
664 g_quark_to_string (error->domain), error->code,
665 error->message);
666 _g_dbus_debug_print_unlock ();
667 }
668
669 /* Every async read that uses this callback uses worker->cancellable
670 * as its GCancellable. worker->cancellable gets cancelled if and only
671 * if the GDBusConnection tells us to close (either via
672 * _g_dbus_worker_stop, which is called on last-unref, or directly),
673 * so a cancelled read must mean our connection was closed locally.
674 *
675 * If we're closing, other errors are possible - notably,
676 * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
677 * read in-flight. It seems sensible to treat all read errors during
678 * closing as an expected thing that doesn't trip exit-on-close.
679 *
680 * Because close_expected can't be set until we get into the worker
681 * thread, but the cancellable is signalled sooner (from another
682 * thread), we do still need to check the error.
683 */
684 if (worker->close_expected ||
685 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
686 _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
687 else
688 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
689
690 g_error_free (error);
691 goto out;
692 }
693
694 #if 0
695 g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
696 (gint) bytes_read,
697 g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
698 g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
699 g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
700 G_IO_IN | G_IO_OUT | G_IO_HUP),
701 worker->stream,
702 worker);
703 #endif
704
705 /* The read failed, which could mean the dbus-daemon was sent SIGTERM. */
706 if (bytes_read == 0)
707 {
708 g_set_error (&error,
709 G_IO_ERROR,
710 G_IO_ERROR_FAILED,
711 "Underlying GIOStream returned 0 bytes on an async read");
712 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
713 g_error_free (error);
714 goto out;
715 }
716
717 read_message_print_transport_debug (bytes_read, worker);
718
719 worker->read_buffer_cur_size += bytes_read;
720 if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
721 {
722 /* OK, got what we asked for! */
723 if (worker->read_buffer_bytes_wanted == 16)
724 {
725 gssize message_len;
726 /* OK, got the header - determine how many more bytes are needed */
727 error = NULL;
728 message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
729 16,
730 &error);
731 if (message_len == -1)
732 {
733 g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
734 _g_dbus_worker_emit_disconnected (worker, FALSE, error);
735 g_error_free (error);
736 goto out;
737 }
738
739 worker->read_buffer_bytes_wanted = message_len;
740 _g_dbus_worker_do_read_unlocked (worker);
741 }
742 else
743 {
744 GDBusMessage *message;
745 error = NULL;
746
747 /* TODO: use connection->priv->auth to decode the message */
748
749 message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
750 worker->read_buffer_cur_size,
751 worker->capabilities,
752 &error);
753 if (message == NULL)
754 {
755 gchar *s;
756 s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
757 g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
758 "The error is: %s\n"
759 "The payload is as follows:\n"
760 "%s",
761 worker->read_buffer_cur_size,
762 error->message,
763 s);
764 g_free (s);
765 _g_dbus_worker_emit_disconnected (worker, FALSE, error);
766 g_error_free (error);
767 goto out;
768 }
769
770 #ifdef G_OS_UNIX
771 if (worker->read_fd_list != NULL)
772 {
773 g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
774 g_object_unref (worker->read_fd_list);
775 worker->read_fd_list = NULL;
776 }
777 #endif
778
779 if (G_UNLIKELY (_g_dbus_debug_message ()))
780 {
781 gchar *s;
782 _g_dbus_debug_print_lock ();
783 g_print ("========================================================================\n"
784 "GDBus-debug:Message:\n"
785 " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
786 worker->read_buffer_cur_size);
787 s = g_dbus_message_print (message, 2);
788 g_print ("%s", s);
789 g_free (s);
790 if (G_UNLIKELY (_g_dbus_debug_payload ()))
791 {
792 s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
793 g_print ("%s\n", s);
794 g_free (s);
795 }
796 _g_dbus_debug_print_unlock ();
797 }
798
799 /* yay, got a message, go deliver it */
800 _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message));
801
802 /* start reading another message! */
803 worker->read_buffer_bytes_wanted = 0;
804 worker->read_buffer_cur_size = 0;
805 _g_dbus_worker_do_read_unlocked (worker);
806 }
807 }
808 else
809 {
810 /* didn't get all the bytes we requested - so repeat the request... */
811 _g_dbus_worker_do_read_unlocked (worker);
812 }
813
814 out:
815 g_mutex_unlock (&worker->read_lock);
816
817 /* check if there is any pending close */
818 schedule_pending_close (worker);
819
820 /* gives up the reference acquired when calling g_input_stream_read_async() */
821 _g_dbus_worker_unref (worker);
822 }
823
824 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
825 static void
_g_dbus_worker_do_read_unlocked(GDBusWorker * worker)826 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
827 {
828 /* Note that we do need to keep trying to read even if close_expected is
829 * true, because only failing a read causes us to signal 'closed'.
830 */
831
832 /* if bytes_wanted is zero, it means start reading a message */
833 if (worker->read_buffer_bytes_wanted == 0)
834 {
835 worker->read_buffer_cur_size = 0;
836 worker->read_buffer_bytes_wanted = 16;
837 }
838
839 /* ensure we have a (big enough) buffer */
840 if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
841 {
842 /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
843 worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
844 worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
845 }
846
847 if (worker->socket == NULL)
848 g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
849 worker->read_buffer + worker->read_buffer_cur_size,
850 worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
851 G_PRIORITY_DEFAULT,
852 worker->cancellable,
853 (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
854 _g_dbus_worker_ref (worker));
855 else
856 {
857 worker->read_ancillary_messages = NULL;
858 worker->read_num_ancillary_messages = 0;
859 _g_socket_read_with_control_messages (worker->socket,
860 worker->read_buffer + worker->read_buffer_cur_size,
861 worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
862 &worker->read_ancillary_messages,
863 &worker->read_num_ancillary_messages,
864 G_PRIORITY_DEFAULT,
865 worker->cancellable,
866 (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
867 _g_dbus_worker_ref (worker));
868 }
869 }
870
871 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
872 static gboolean
_g_dbus_worker_do_initial_read(gpointer data)873 _g_dbus_worker_do_initial_read (gpointer data)
874 {
875 GDBusWorker *worker = data;
876 g_mutex_lock (&worker->read_lock);
877 _g_dbus_worker_do_read_unlocked (worker);
878 g_mutex_unlock (&worker->read_lock);
879 return FALSE;
880 }
881
882 /* ---------------------------------------------------------------------------------------------------- */
883
884 struct _MessageToWriteData
885 {
886 GDBusWorker *worker;
887 GDBusMessage *message;
888 gchar *blob;
889 gsize blob_size;
890
891 gsize total_written;
892 GTask *task;
893 };
894
895 static void
message_to_write_data_free(MessageToWriteData * data)896 message_to_write_data_free (MessageToWriteData *data)
897 {
898 _g_dbus_worker_unref (data->worker);
899 if (data->message)
900 g_object_unref (data->message);
901 g_free (data->blob);
902 g_slice_free (MessageToWriteData, data);
903 }
904
905 /* ---------------------------------------------------------------------------------------------------- */
906
907 static void write_message_continue_writing (MessageToWriteData *data);
908
909 /* called in private thread shared by all GDBusConnection instances
910 *
911 * write-lock is not held on entry
912 * output_pending is PENDING_WRITE on entry
913 */
914 static void
write_message_async_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)915 write_message_async_cb (GObject *source_object,
916 GAsyncResult *res,
917 gpointer user_data)
918 {
919 MessageToWriteData *data = user_data;
920 GTask *task;
921 gssize bytes_written;
922 GError *error;
923
924 /* Note: we can't access data->task after calling g_task_return_* () because the
925 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
926 */
927 task = data->task;
928
929 error = NULL;
930 bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
931 res,
932 &error);
933 if (bytes_written == -1)
934 {
935 g_task_return_error (task, error);
936 g_object_unref (task);
937 goto out;
938 }
939 g_assert (bytes_written > 0); /* zero is never returned */
940
941 write_message_print_transport_debug (bytes_written, data);
942
943 data->total_written += bytes_written;
944 g_assert (data->total_written <= data->blob_size);
945 if (data->total_written == data->blob_size)
946 {
947 g_task_return_boolean (task, TRUE);
948 g_object_unref (task);
949 goto out;
950 }
951
952 write_message_continue_writing (data);
953
954 out:
955 ;
956 }
957
958 /* called in private thread shared by all GDBusConnection instances
959 *
960 * write-lock is not held on entry
961 * output_pending is PENDING_WRITE on entry
962 */
963 #ifdef G_OS_UNIX
964 static gboolean
on_socket_ready(GSocket * socket,GIOCondition condition,gpointer user_data)965 on_socket_ready (GSocket *socket,
966 GIOCondition condition,
967 gpointer user_data)
968 {
969 MessageToWriteData *data = user_data;
970 write_message_continue_writing (data);
971 return FALSE; /* remove source */
972 }
973 #endif
974
975 /* called in private thread shared by all GDBusConnection instances
976 *
977 * write-lock is not held on entry
978 * output_pending is PENDING_WRITE on entry
979 */
980 static void
write_message_continue_writing(MessageToWriteData * data)981 write_message_continue_writing (MessageToWriteData *data)
982 {
983 GOutputStream *ostream;
984 #ifdef G_OS_UNIX
985 GTask *task;
986 GUnixFDList *fd_list;
987 #endif
988
989 #ifdef G_OS_UNIX
990 /* Note: we can't access data->task after calling g_task_return_* () because the
991 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
992 */
993 task = data->task;
994 #endif
995
996 ostream = g_io_stream_get_output_stream (data->worker->stream);
997 #ifdef G_OS_UNIX
998 fd_list = g_dbus_message_get_unix_fd_list (data->message);
999 #endif
1000
1001 g_assert (!g_output_stream_has_pending (ostream));
1002 g_assert_cmpint (data->total_written, <, data->blob_size);
1003
1004 if (FALSE)
1005 {
1006 }
1007 #ifdef G_OS_UNIX
1008 else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1009 {
1010 GOutputVector vector;
1011 GSocketControlMessage *control_message;
1012 gssize bytes_written;
1013 GError *error;
1014
1015 vector.buffer = data->blob;
1016 vector.size = data->blob_size;
1017
1018 control_message = NULL;
1019 if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1020 {
1021 if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1022 {
1023 g_task_return_new_error (task,
1024 G_IO_ERROR,
1025 G_IO_ERROR_FAILED,
1026 "Tried sending a file descriptor but remote peer does not support this capability");
1027 g_object_unref (task);
1028 goto out;
1029 }
1030 control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1031 }
1032
1033 error = NULL;
1034 bytes_written = g_socket_send_message (data->worker->socket,
1035 NULL, /* address */
1036 &vector,
1037 1,
1038 control_message != NULL ? &control_message : NULL,
1039 control_message != NULL ? 1 : 0,
1040 G_SOCKET_MSG_NONE,
1041 data->worker->cancellable,
1042 &error);
1043 if (control_message != NULL)
1044 g_object_unref (control_message);
1045
1046 if (bytes_written == -1)
1047 {
1048 /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1049 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1050 {
1051 GSource *source;
1052 source = g_socket_create_source (data->worker->socket,
1053 G_IO_OUT | G_IO_HUP | G_IO_ERR,
1054 data->worker->cancellable);
1055 g_source_set_callback (source,
1056 (GSourceFunc) on_socket_ready,
1057 data,
1058 NULL); /* GDestroyNotify */
1059 g_source_attach (source, g_main_context_get_thread_default ());
1060 g_source_unref (source);
1061 g_error_free (error);
1062 goto out;
1063 }
1064 g_task_return_error (task, error);
1065 g_object_unref (task);
1066 goto out;
1067 }
1068 g_assert (bytes_written > 0); /* zero is never returned */
1069
1070 write_message_print_transport_debug (bytes_written, data);
1071
1072 data->total_written += bytes_written;
1073 g_assert (data->total_written <= data->blob_size);
1074 if (data->total_written == data->blob_size)
1075 {
1076 g_task_return_boolean (task, TRUE);
1077 g_object_unref (task);
1078 goto out;
1079 }
1080
1081 write_message_continue_writing (data);
1082 }
1083 #endif
1084 else
1085 {
1086 #ifdef G_OS_UNIX
1087 if (fd_list != NULL)
1088 {
1089 g_task_return_new_error (task,
1090 G_IO_ERROR,
1091 G_IO_ERROR_FAILED,
1092 "Tried sending a file descriptor on unsupported stream of type %s",
1093 g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1094 g_object_unref (task);
1095 goto out;
1096 }
1097 #endif
1098
1099 g_output_stream_write_async (ostream,
1100 (const gchar *) data->blob + data->total_written,
1101 data->blob_size - data->total_written,
1102 G_PRIORITY_DEFAULT,
1103 data->worker->cancellable,
1104 write_message_async_cb,
1105 data);
1106 }
1107 #ifdef G_OS_UNIX
1108 out:
1109 #endif
1110 ;
1111 }
1112
1113 /* called in private thread shared by all GDBusConnection instances
1114 *
1115 * write-lock is not held on entry
1116 * output_pending is PENDING_WRITE on entry
1117 */
1118 static void
write_message_async(GDBusWorker * worker,MessageToWriteData * data,GAsyncReadyCallback callback,gpointer user_data)1119 write_message_async (GDBusWorker *worker,
1120 MessageToWriteData *data,
1121 GAsyncReadyCallback callback,
1122 gpointer user_data)
1123 {
1124 data->task = g_task_new (NULL, NULL, callback, user_data);
1125 g_task_set_source_tag (data->task, write_message_async);
1126 data->total_written = 0;
1127 write_message_continue_writing (data);
1128 }
1129
1130 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1131 static gboolean
write_message_finish(GAsyncResult * res,GError ** error)1132 write_message_finish (GAsyncResult *res,
1133 GError **error)
1134 {
1135 g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE);
1136
1137 return g_task_propagate_boolean (G_TASK (res), error);
1138 }
1139 /* ---------------------------------------------------------------------------------------------------- */
1140
1141 static void continue_writing (GDBusWorker *worker);
1142
1143 typedef struct
1144 {
1145 GDBusWorker *worker;
1146 GList *flushers;
1147 } FlushAsyncData;
1148
1149 static void
flush_data_list_complete(const GList * flushers,const GError * error)1150 flush_data_list_complete (const GList *flushers,
1151 const GError *error)
1152 {
1153 const GList *l;
1154
1155 for (l = flushers; l != NULL; l = l->next)
1156 {
1157 FlushData *f = l->data;
1158
1159 f->error = error != NULL ? g_error_copy (error) : NULL;
1160
1161 g_mutex_lock (&f->mutex);
1162 f->finished = TRUE;
1163 g_cond_signal (&f->cond);
1164 g_mutex_unlock (&f->mutex);
1165 }
1166 }
1167
1168 /* called in private thread shared by all GDBusConnection instances
1169 *
1170 * write-lock is not held on entry
1171 * output_pending is PENDING_FLUSH on entry
1172 */
1173 static void
ostream_flush_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1174 ostream_flush_cb (GObject *source_object,
1175 GAsyncResult *res,
1176 gpointer user_data)
1177 {
1178 FlushAsyncData *data = user_data;
1179 GError *error;
1180
1181 error = NULL;
1182 g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1183 res,
1184 &error);
1185
1186 if (error == NULL)
1187 {
1188 if (G_UNLIKELY (_g_dbus_debug_transport ()))
1189 {
1190 _g_dbus_debug_print_lock ();
1191 g_print ("========================================================================\n"
1192 "GDBus-debug:Transport:\n"
1193 " ---- FLUSHED stream of type %s\n",
1194 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1195 _g_dbus_debug_print_unlock ();
1196 }
1197 }
1198
1199 /* Make sure we tell folks that we don't have additional
1200 flushes pending */
1201 g_mutex_lock (&data->worker->write_lock);
1202 data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1203 g_assert (data->worker->output_pending == PENDING_FLUSH);
1204 data->worker->output_pending = PENDING_NONE;
1205 g_mutex_unlock (&data->worker->write_lock);
1206
1207 g_assert (data->flushers != NULL);
1208 flush_data_list_complete (data->flushers, error);
1209 g_list_free (data->flushers);
1210 if (error != NULL)
1211 g_error_free (error);
1212
1213 /* OK, cool, finally kick off the next write */
1214 continue_writing (data->worker);
1215
1216 _g_dbus_worker_unref (data->worker);
1217 g_free (data);
1218 }
1219
1220 /* called in private thread shared by all GDBusConnection instances
1221 *
1222 * write-lock is not held on entry
1223 * output_pending is PENDING_FLUSH on entry
1224 */
1225 static void
start_flush(FlushAsyncData * data)1226 start_flush (FlushAsyncData *data)
1227 {
1228 g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1229 G_PRIORITY_DEFAULT,
1230 data->worker->cancellable,
1231 ostream_flush_cb,
1232 data);
1233 }
1234
1235 /* called in private thread shared by all GDBusConnection instances
1236 *
1237 * write-lock is held on entry
1238 * output_pending is PENDING_NONE on entry
1239 */
1240 static void
message_written_unlocked(GDBusWorker * worker,MessageToWriteData * message_data)1241 message_written_unlocked (GDBusWorker *worker,
1242 MessageToWriteData *message_data)
1243 {
1244 if (G_UNLIKELY (_g_dbus_debug_message ()))
1245 {
1246 gchar *s;
1247 _g_dbus_debug_print_lock ();
1248 g_print ("========================================================================\n"
1249 "GDBus-debug:Message:\n"
1250 " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1251 message_data->blob_size);
1252 s = g_dbus_message_print (message_data->message, 2);
1253 g_print ("%s", s);
1254 g_free (s);
1255 if (G_UNLIKELY (_g_dbus_debug_payload ()))
1256 {
1257 s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1258 g_print ("%s\n", s);
1259 g_free (s);
1260 }
1261 _g_dbus_debug_print_unlock ();
1262 }
1263
1264 worker->write_num_messages_written += 1;
1265 }
1266
1267 /* called in private thread shared by all GDBusConnection instances
1268 *
1269 * write-lock is held on entry
1270 * output_pending is PENDING_NONE on entry
1271 *
1272 * Returns: non-%NULL, setting @output_pending, if we need to flush now
1273 */
1274 static FlushAsyncData *
prepare_flush_unlocked(GDBusWorker * worker)1275 prepare_flush_unlocked (GDBusWorker *worker)
1276 {
1277 GList *l;
1278 GList *ll;
1279 GList *flushers;
1280
1281 flushers = NULL;
1282 for (l = worker->write_pending_flushes; l != NULL; l = ll)
1283 {
1284 FlushData *f = l->data;
1285 ll = l->next;
1286
1287 if (f->number_to_wait_for == worker->write_num_messages_written)
1288 {
1289 flushers = g_list_append (flushers, f);
1290 worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1291 }
1292 }
1293 if (flushers != NULL)
1294 {
1295 g_assert (worker->output_pending == PENDING_NONE);
1296 worker->output_pending = PENDING_FLUSH;
1297 }
1298
1299 if (flushers != NULL)
1300 {
1301 FlushAsyncData *data;
1302
1303 data = g_new0 (FlushAsyncData, 1);
1304 data->worker = _g_dbus_worker_ref (worker);
1305 data->flushers = flushers;
1306 return data;
1307 }
1308
1309 return NULL;
1310 }
1311
1312 /* called in private thread shared by all GDBusConnection instances
1313 *
1314 * write-lock is not held on entry
1315 * output_pending is PENDING_WRITE on entry
1316 */
1317 static void
write_message_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1318 write_message_cb (GObject *source_object,
1319 GAsyncResult *res,
1320 gpointer user_data)
1321 {
1322 MessageToWriteData *data = user_data;
1323 GError *error;
1324
1325 g_mutex_lock (&data->worker->write_lock);
1326 g_assert (data->worker->output_pending == PENDING_WRITE);
1327 data->worker->output_pending = PENDING_NONE;
1328
1329 error = NULL;
1330 if (!write_message_finish (res, &error))
1331 {
1332 g_mutex_unlock (&data->worker->write_lock);
1333
1334 /* TODO: handle */
1335 _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1336 g_error_free (error);
1337
1338 g_mutex_lock (&data->worker->write_lock);
1339 }
1340
1341 message_written_unlocked (data->worker, data);
1342
1343 g_mutex_unlock (&data->worker->write_lock);
1344
1345 continue_writing (data->worker);
1346
1347 message_to_write_data_free (data);
1348 }
1349
1350 /* called in private thread shared by all GDBusConnection instances
1351 *
1352 * write-lock is not held on entry
1353 * output_pending is PENDING_CLOSE on entry
1354 */
1355 static void
iostream_close_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1356 iostream_close_cb (GObject *source_object,
1357 GAsyncResult *res,
1358 gpointer user_data)
1359 {
1360 GDBusWorker *worker = user_data;
1361 GError *error = NULL;
1362 GList *pending_close_attempts, *pending_flush_attempts;
1363 GQueue *send_queue;
1364
1365 g_io_stream_close_finish (worker->stream, res, &error);
1366
1367 g_mutex_lock (&worker->write_lock);
1368
1369 pending_close_attempts = worker->pending_close_attempts;
1370 worker->pending_close_attempts = NULL;
1371
1372 pending_flush_attempts = worker->write_pending_flushes;
1373 worker->write_pending_flushes = NULL;
1374
1375 send_queue = worker->write_queue;
1376 worker->write_queue = g_queue_new ();
1377
1378 g_assert (worker->output_pending == PENDING_CLOSE);
1379 worker->output_pending = PENDING_NONE;
1380
1381 /* Ensure threads waiting for pending flushes to finish will be unblocked. */
1382 worker->write_num_messages_flushed =
1383 worker->write_num_messages_written + g_list_length(pending_flush_attempts);
1384
1385 g_mutex_unlock (&worker->write_lock);
1386
1387 while (pending_close_attempts != NULL)
1388 {
1389 CloseData *close_data = pending_close_attempts->data;
1390
1391 pending_close_attempts = g_list_delete_link (pending_close_attempts,
1392 pending_close_attempts);
1393
1394 if (close_data->task != NULL)
1395 {
1396 if (error != NULL)
1397 g_task_return_error (close_data->task, g_error_copy (error));
1398 else
1399 g_task_return_boolean (close_data->task, TRUE);
1400 }
1401
1402 close_data_free (close_data);
1403 }
1404
1405 g_clear_error (&error);
1406
1407 /* all messages queued for sending are discarded */
1408 g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1409 /* all queued flushes fail */
1410 error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1411 _("Operation was cancelled"));
1412 flush_data_list_complete (pending_flush_attempts, error);
1413 g_list_free (pending_flush_attempts);
1414 g_clear_error (&error);
1415
1416 _g_dbus_worker_unref (worker);
1417 }
1418
1419 /* called in private thread shared by all GDBusConnection instances
1420 *
1421 * write-lock is not held on entry
1422 * output_pending must be PENDING_NONE on entry
1423 */
1424 static void
continue_writing(GDBusWorker * worker)1425 continue_writing (GDBusWorker *worker)
1426 {
1427 MessageToWriteData *data;
1428 FlushAsyncData *flush_async_data;
1429
1430 write_next:
1431 /* we mustn't try to write two things at once */
1432 g_assert (worker->output_pending == PENDING_NONE);
1433
1434 g_mutex_lock (&worker->write_lock);
1435
1436 data = NULL;
1437 flush_async_data = NULL;
1438
1439 /* if we want to close the connection, that takes precedence */
1440 if (worker->pending_close_attempts != NULL)
1441 {
1442 GInputStream *input = g_io_stream_get_input_stream (worker->stream);
1443
1444 if (!g_input_stream_has_pending (input))
1445 {
1446 worker->close_expected = TRUE;
1447 worker->output_pending = PENDING_CLOSE;
1448
1449 g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1450 NULL, iostream_close_cb,
1451 _g_dbus_worker_ref (worker));
1452 }
1453 }
1454 else
1455 {
1456 flush_async_data = prepare_flush_unlocked (worker);
1457
1458 if (flush_async_data == NULL)
1459 {
1460 data = g_queue_pop_head (worker->write_queue);
1461
1462 if (data != NULL)
1463 worker->output_pending = PENDING_WRITE;
1464 }
1465 }
1466
1467 g_mutex_unlock (&worker->write_lock);
1468
1469 /* Note that write_lock is only used for protecting the @write_queue
1470 * and @output_pending fields of the GDBusWorker struct ... which we
1471 * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1472 *
1473 * Therefore, it's fine to drop it here when calling back into user
1474 * code and then writing the message out onto the GIOStream since this
1475 * function only runs on the worker thread.
1476 */
1477
1478 if (flush_async_data != NULL)
1479 {
1480 start_flush (flush_async_data);
1481 g_assert (data == NULL);
1482 }
1483 else if (data != NULL)
1484 {
1485 GDBusMessage *old_message;
1486 guchar *new_blob;
1487 gsize new_blob_size;
1488 GError *error;
1489
1490 old_message = data->message;
1491 data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1492 if (data->message == old_message)
1493 {
1494 /* filters had no effect - do nothing */
1495 }
1496 else if (data->message == NULL)
1497 {
1498 /* filters dropped message */
1499 g_mutex_lock (&worker->write_lock);
1500 worker->output_pending = PENDING_NONE;
1501 g_mutex_unlock (&worker->write_lock);
1502 message_to_write_data_free (data);
1503 goto write_next;
1504 }
1505 else
1506 {
1507 /* filters altered the message -> reencode */
1508 error = NULL;
1509 new_blob = g_dbus_message_to_blob (data->message,
1510 &new_blob_size,
1511 worker->capabilities,
1512 &error);
1513 if (new_blob == NULL)
1514 {
1515 /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1516 * the old message instead
1517 */
1518 g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1519 g_dbus_message_get_serial (data->message),
1520 error->message);
1521 g_error_free (error);
1522 }
1523 else
1524 {
1525 g_free (data->blob);
1526 data->blob = (gchar *) new_blob;
1527 data->blob_size = new_blob_size;
1528 }
1529 }
1530
1531 write_message_async (worker,
1532 data,
1533 write_message_cb,
1534 data);
1535 }
1536 }
1537
1538 /* called in private thread shared by all GDBusConnection instances
1539 *
1540 * write-lock is not held on entry
1541 * output_pending may be anything
1542 */
1543 static gboolean
continue_writing_in_idle_cb(gpointer user_data)1544 continue_writing_in_idle_cb (gpointer user_data)
1545 {
1546 GDBusWorker *worker = user_data;
1547
1548 /* Because this is the worker thread, we can read this struct member
1549 * without holding the lock: no other thread ever modifies it.
1550 */
1551 if (worker->output_pending == PENDING_NONE)
1552 continue_writing (worker);
1553
1554 return FALSE;
1555 }
1556
1557 /*
1558 * @write_data: (transfer full) (nullable):
1559 * @flush_data: (transfer full) (nullable):
1560 * @close_data: (transfer full) (nullable):
1561 *
1562 * Can be called from any thread
1563 *
1564 * write_lock is held on entry
1565 * output_pending may be anything
1566 */
1567 static void
schedule_writing_unlocked(GDBusWorker * worker,MessageToWriteData * write_data,FlushData * flush_data,CloseData * close_data)1568 schedule_writing_unlocked (GDBusWorker *worker,
1569 MessageToWriteData *write_data,
1570 FlushData *flush_data,
1571 CloseData *close_data)
1572 {
1573 if (write_data != NULL)
1574 g_queue_push_tail (worker->write_queue, write_data);
1575
1576 if (flush_data != NULL)
1577 worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1578
1579 if (close_data != NULL)
1580 worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1581 close_data);
1582
1583 /* If we had output pending, the next bit of output will happen
1584 * automatically when it finishes, so we only need to do this
1585 * if nothing was pending.
1586 *
1587 * The idle callback will re-check that output_pending is still
1588 * PENDING_NONE, to guard against output starting before the idle.
1589 */
1590 if (worker->output_pending == PENDING_NONE)
1591 {
1592 GSource *idle_source;
1593 idle_source = g_idle_source_new ();
1594 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1595 g_source_set_callback (idle_source,
1596 continue_writing_in_idle_cb,
1597 _g_dbus_worker_ref (worker),
1598 (GDestroyNotify) _g_dbus_worker_unref);
1599 g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1600 g_source_attach (idle_source, worker->shared_thread_data->context);
1601 g_source_unref (idle_source);
1602 }
1603 }
1604
1605 static void
schedule_pending_close(GDBusWorker * worker)1606 schedule_pending_close (GDBusWorker *worker)
1607 {
1608 g_mutex_lock (&worker->write_lock);
1609 if (worker->pending_close_attempts)
1610 schedule_writing_unlocked (worker, NULL, NULL, NULL);
1611 g_mutex_unlock (&worker->write_lock);
1612 }
1613
1614 /* ---------------------------------------------------------------------------------------------------- */
1615
1616 /* can be called from any thread - steals blob
1617 *
1618 * write_lock is not held on entry
1619 * output_pending may be anything
1620 */
1621 void
_g_dbus_worker_send_message(GDBusWorker * worker,GDBusMessage * message,gchar * blob,gsize blob_len)1622 _g_dbus_worker_send_message (GDBusWorker *worker,
1623 GDBusMessage *message,
1624 gchar *blob,
1625 gsize blob_len)
1626 {
1627 MessageToWriteData *data;
1628
1629 g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1630 g_return_if_fail (blob != NULL);
1631 g_return_if_fail (blob_len > 16);
1632
1633 data = g_slice_new0 (MessageToWriteData);
1634 data->worker = _g_dbus_worker_ref (worker);
1635 data->message = g_object_ref (message);
1636 data->blob = blob; /* steal! */
1637 data->blob_size = blob_len;
1638
1639 g_mutex_lock (&worker->write_lock);
1640 schedule_writing_unlocked (worker, data, NULL, NULL);
1641 g_mutex_unlock (&worker->write_lock);
1642 }
1643
1644 /* ---------------------------------------------------------------------------------------------------- */
1645
1646 GDBusWorker *
_g_dbus_worker_new(GIOStream * stream,GDBusCapabilityFlags capabilities,gboolean initially_frozen,GDBusWorkerMessageReceivedCallback message_received_callback,GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,GDBusWorkerDisconnectedCallback disconnected_callback,gpointer user_data)1647 _g_dbus_worker_new (GIOStream *stream,
1648 GDBusCapabilityFlags capabilities,
1649 gboolean initially_frozen,
1650 GDBusWorkerMessageReceivedCallback message_received_callback,
1651 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1652 GDBusWorkerDisconnectedCallback disconnected_callback,
1653 gpointer user_data)
1654 {
1655 GDBusWorker *worker;
1656 GSource *idle_source;
1657
1658 g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1659 g_return_val_if_fail (message_received_callback != NULL, NULL);
1660 g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1661 g_return_val_if_fail (disconnected_callback != NULL, NULL);
1662
1663 worker = g_new0 (GDBusWorker, 1);
1664 worker->ref_count = 1;
1665
1666 g_mutex_init (&worker->read_lock);
1667 worker->message_received_callback = message_received_callback;
1668 worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1669 worker->disconnected_callback = disconnected_callback;
1670 worker->user_data = user_data;
1671 worker->stream = g_object_ref (stream);
1672 worker->capabilities = capabilities;
1673 worker->cancellable = g_cancellable_new ();
1674 worker->output_pending = PENDING_NONE;
1675
1676 worker->frozen = initially_frozen;
1677 worker->received_messages_while_frozen = g_queue_new ();
1678
1679 g_mutex_init (&worker->write_lock);
1680 worker->write_queue = g_queue_new ();
1681
1682 if (G_IS_SOCKET_CONNECTION (worker->stream))
1683 worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1684
1685 worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1686
1687 /* begin reading */
1688 idle_source = g_idle_source_new ();
1689 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1690 g_source_set_callback (idle_source,
1691 _g_dbus_worker_do_initial_read,
1692 _g_dbus_worker_ref (worker),
1693 (GDestroyNotify) _g_dbus_worker_unref);
1694 g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1695 g_source_attach (idle_source, worker->shared_thread_data->context);
1696 g_source_unref (idle_source);
1697
1698 return worker;
1699 }
1700
1701 /* ---------------------------------------------------------------------------------------------------- */
1702
1703 /* can be called from any thread
1704 *
1705 * write_lock is not held on entry
1706 * output_pending may be anything
1707 */
1708 void
_g_dbus_worker_close(GDBusWorker * worker,GTask * task)1709 _g_dbus_worker_close (GDBusWorker *worker,
1710 GTask *task)
1711 {
1712 CloseData *close_data;
1713
1714 close_data = g_slice_new0 (CloseData);
1715 close_data->worker = _g_dbus_worker_ref (worker);
1716 close_data->task = (task == NULL ? NULL : g_object_ref (task));
1717
1718 /* Don't set worker->close_expected here - we're in the wrong thread.
1719 * It'll be set before the actual close happens.
1720 */
1721 g_cancellable_cancel (worker->cancellable);
1722 g_mutex_lock (&worker->write_lock);
1723 schedule_writing_unlocked (worker, NULL, NULL, close_data);
1724 g_mutex_unlock (&worker->write_lock);
1725 }
1726
1727 /* This can be called from any thread - frees worker. Note that
1728 * callbacks might still happen if called from another thread than the
1729 * worker - use your own synchronization primitive in the callbacks.
1730 *
1731 * write_lock is not held on entry
1732 * output_pending may be anything
1733 */
1734 void
_g_dbus_worker_stop(GDBusWorker * worker)1735 _g_dbus_worker_stop (GDBusWorker *worker)
1736 {
1737 g_atomic_int_set (&worker->stopped, TRUE);
1738
1739 /* Cancel any pending operations and schedule a close of the underlying I/O
1740 * stream in the worker thread
1741 */
1742 _g_dbus_worker_close (worker, NULL);
1743
1744 /* _g_dbus_worker_close holds a ref until after an idle in the worker
1745 * thread has run, so we no longer need to unref in an idle like in
1746 * commit 322e25b535
1747 */
1748 _g_dbus_worker_unref (worker);
1749 }
1750
1751 /* ---------------------------------------------------------------------------------------------------- */
1752
1753 /* can be called from any thread (except the worker thread) - blocks
1754 * calling thread until all queued outgoing messages are written and
1755 * the transport has been flushed
1756 *
1757 * write_lock is not held on entry
1758 * output_pending may be anything
1759 */
1760 gboolean
_g_dbus_worker_flush_sync(GDBusWorker * worker,GCancellable * cancellable,GError ** error)1761 _g_dbus_worker_flush_sync (GDBusWorker *worker,
1762 GCancellable *cancellable,
1763 GError **error)
1764 {
1765 gboolean ret;
1766 FlushData *data;
1767 guint64 pending_writes;
1768
1769 data = NULL;
1770 ret = TRUE;
1771
1772 g_mutex_lock (&worker->write_lock);
1773
1774 /* if the queue is empty, no write is in-flight and we haven't written
1775 * anything since the last flush, then there's nothing to wait for
1776 */
1777 pending_writes = g_queue_get_length (worker->write_queue);
1778
1779 /* if a write is in-flight, we shouldn't be satisfied until the first
1780 * flush operation that follows it
1781 */
1782 if (worker->output_pending == PENDING_WRITE)
1783 pending_writes += 1;
1784
1785 if (pending_writes > 0 ||
1786 worker->write_num_messages_written != worker->write_num_messages_flushed)
1787 {
1788 data = g_new0 (FlushData, 1);
1789 g_mutex_init (&data->mutex);
1790 g_cond_init (&data->cond);
1791 data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1792 data->finished = FALSE;
1793 g_mutex_lock (&data->mutex);
1794
1795 schedule_writing_unlocked (worker, NULL, data, NULL);
1796 }
1797 g_mutex_unlock (&worker->write_lock);
1798
1799 if (data != NULL)
1800 {
1801 /* Wait for flush operations to finish. */
1802 while (!data->finished)
1803 {
1804 g_cond_wait (&data->cond, &data->mutex);
1805 }
1806
1807 g_mutex_unlock (&data->mutex);
1808 g_cond_clear (&data->cond);
1809 g_mutex_clear (&data->mutex);
1810 if (data->error != NULL)
1811 {
1812 ret = FALSE;
1813 g_propagate_error (error, data->error);
1814 }
1815 g_free (data);
1816 }
1817
1818 return ret;
1819 }
1820
1821 /* ---------------------------------------------------------------------------------------------------- */
1822
1823 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1824 #define G_DBUS_DEBUG_TRANSPORT (1<<1)
1825 #define G_DBUS_DEBUG_MESSAGE (1<<2)
1826 #define G_DBUS_DEBUG_PAYLOAD (1<<3)
1827 #define G_DBUS_DEBUG_CALL (1<<4)
1828 #define G_DBUS_DEBUG_SIGNAL (1<<5)
1829 #define G_DBUS_DEBUG_INCOMING (1<<6)
1830 #define G_DBUS_DEBUG_RETURN (1<<7)
1831 #define G_DBUS_DEBUG_EMISSION (1<<8)
1832 #define G_DBUS_DEBUG_ADDRESS (1<<9)
1833 #define G_DBUS_DEBUG_PROXY (1<<10)
1834
1835 static gint _gdbus_debug_flags = 0;
1836
1837 gboolean
_g_dbus_debug_authentication(void)1838 _g_dbus_debug_authentication (void)
1839 {
1840 _g_dbus_initialize ();
1841 return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1842 }
1843
1844 gboolean
_g_dbus_debug_transport(void)1845 _g_dbus_debug_transport (void)
1846 {
1847 _g_dbus_initialize ();
1848 return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1849 }
1850
1851 gboolean
_g_dbus_debug_message(void)1852 _g_dbus_debug_message (void)
1853 {
1854 _g_dbus_initialize ();
1855 return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1856 }
1857
1858 gboolean
_g_dbus_debug_payload(void)1859 _g_dbus_debug_payload (void)
1860 {
1861 _g_dbus_initialize ();
1862 return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1863 }
1864
1865 gboolean
_g_dbus_debug_call(void)1866 _g_dbus_debug_call (void)
1867 {
1868 _g_dbus_initialize ();
1869 return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1870 }
1871
1872 gboolean
_g_dbus_debug_signal(void)1873 _g_dbus_debug_signal (void)
1874 {
1875 _g_dbus_initialize ();
1876 return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1877 }
1878
1879 gboolean
_g_dbus_debug_incoming(void)1880 _g_dbus_debug_incoming (void)
1881 {
1882 _g_dbus_initialize ();
1883 return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1884 }
1885
1886 gboolean
_g_dbus_debug_return(void)1887 _g_dbus_debug_return (void)
1888 {
1889 _g_dbus_initialize ();
1890 return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1891 }
1892
1893 gboolean
_g_dbus_debug_emission(void)1894 _g_dbus_debug_emission (void)
1895 {
1896 _g_dbus_initialize ();
1897 return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1898 }
1899
1900 gboolean
_g_dbus_debug_address(void)1901 _g_dbus_debug_address (void)
1902 {
1903 _g_dbus_initialize ();
1904 return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1905 }
1906
1907 gboolean
_g_dbus_debug_proxy(void)1908 _g_dbus_debug_proxy (void)
1909 {
1910 _g_dbus_initialize ();
1911 return (_gdbus_debug_flags & G_DBUS_DEBUG_PROXY) != 0;
1912 }
1913
1914 G_LOCK_DEFINE_STATIC (print_lock);
1915
1916 void
_g_dbus_debug_print_lock(void)1917 _g_dbus_debug_print_lock (void)
1918 {
1919 G_LOCK (print_lock);
1920 }
1921
1922 void
_g_dbus_debug_print_unlock(void)1923 _g_dbus_debug_print_unlock (void)
1924 {
1925 G_UNLOCK (print_lock);
1926 }
1927
1928 /**
1929 * _g_dbus_initialize:
1930 *
1931 * Does various one-time init things such as
1932 *
1933 * - registering the G_DBUS_ERROR error domain
1934 * - parses the G_DBUS_DEBUG environment variable
1935 */
1936 void
_g_dbus_initialize(void)1937 _g_dbus_initialize (void)
1938 {
1939 static volatile gsize initialized = 0;
1940
1941 if (g_once_init_enter (&initialized))
1942 {
1943 volatile GQuark g_dbus_error_domain;
1944 const gchar *debug;
1945
1946 g_dbus_error_domain = G_DBUS_ERROR;
1947 (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1948
1949 debug = g_getenv ("G_DBUS_DEBUG");
1950 if (debug != NULL)
1951 {
1952 const GDebugKey keys[] = {
1953 { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1954 { "transport", G_DBUS_DEBUG_TRANSPORT },
1955 { "message", G_DBUS_DEBUG_MESSAGE },
1956 { "payload", G_DBUS_DEBUG_PAYLOAD },
1957 { "call", G_DBUS_DEBUG_CALL },
1958 { "signal", G_DBUS_DEBUG_SIGNAL },
1959 { "incoming", G_DBUS_DEBUG_INCOMING },
1960 { "return", G_DBUS_DEBUG_RETURN },
1961 { "emission", G_DBUS_DEBUG_EMISSION },
1962 { "address", G_DBUS_DEBUG_ADDRESS },
1963 { "proxy", G_DBUS_DEBUG_PROXY }
1964 };
1965
1966 _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1967 if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1968 _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1969 }
1970
1971 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
1972 ensure_required_types ();
1973
1974 g_once_init_leave (&initialized, 1);
1975 }
1976 }
1977
1978 /* ---------------------------------------------------------------------------------------------------- */
1979
1980 GVariantType *
_g_dbus_compute_complete_signature(GDBusArgInfo ** args)1981 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1982 {
1983 const GVariantType *arg_types[256];
1984 guint n;
1985
1986 if (args)
1987 for (n = 0; args[n] != NULL; n++)
1988 {
1989 /* DBus places a hard limit of 255 on signature length.
1990 * therefore number of args must be less than 256.
1991 */
1992 g_assert (n < 256);
1993
1994 arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1995
1996 if G_UNLIKELY (arg_types[n] == NULL)
1997 return NULL;
1998 }
1999 else
2000 n = 0;
2001
2002 return g_variant_type_new_tuple (arg_types, n);
2003 }
2004
2005 /* ---------------------------------------------------------------------------------------------------- */
2006
2007 #ifdef G_OS_WIN32
2008
2009 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2010
2011 gchar *
_g_dbus_win32_get_user_sid(void)2012 _g_dbus_win32_get_user_sid (void)
2013 {
2014 HANDLE h;
2015 TOKEN_USER *user;
2016 DWORD token_information_len;
2017 PSID psid;
2018 gchar *sid;
2019 gchar *ret;
2020
2021 ret = NULL;
2022 user = NULL;
2023 h = INVALID_HANDLE_VALUE;
2024
2025 if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2026 {
2027 g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2028 goto out;
2029 }
2030
2031 /* Get length of buffer */
2032 token_information_len = 0;
2033 if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2034 {
2035 if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2036 {
2037 g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2038 goto out;
2039 }
2040 }
2041 user = g_malloc (token_information_len);
2042 if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2043 {
2044 g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2045 goto out;
2046 }
2047
2048 psid = user->User.Sid;
2049 if (!IsValidSid (psid))
2050 {
2051 g_warning ("Invalid SID");
2052 goto out;
2053 }
2054
2055 if (!ConvertSidToStringSidA (psid, &sid))
2056 {
2057 g_warning ("Invalid SID");
2058 goto out;
2059 }
2060
2061 ret = g_strdup (sid);
2062 LocalFree (sid);
2063
2064 out:
2065 g_free (user);
2066 if (h != INVALID_HANDLE_VALUE)
2067 CloseHandle (h);
2068 return ret;
2069 }
2070
2071
2072 #define DBUS_DAEMON_ADDRESS_INFO "DBusDaemonAddressInfo"
2073 #define DBUS_DAEMON_MUTEX "DBusDaemonMutex"
2074 #define UNIQUE_DBUS_INIT_MUTEX "UniqueDBusInitMutex"
2075 #define DBUS_AUTOLAUNCH_MUTEX "DBusAutolaunchMutex"
2076
2077 static void
release_mutex(HANDLE mutex)2078 release_mutex (HANDLE mutex)
2079 {
2080 ReleaseMutex (mutex);
2081 CloseHandle (mutex);
2082 }
2083
2084 static HANDLE
acquire_mutex(const char * mutexname)2085 acquire_mutex (const char *mutexname)
2086 {
2087 HANDLE mutex;
2088 DWORD res;
2089
2090 mutex = CreateMutexA (NULL, FALSE, mutexname);
2091 if (!mutex)
2092 return 0;
2093
2094 res = WaitForSingleObject (mutex, INFINITE);
2095 switch (res)
2096 {
2097 case WAIT_ABANDONED:
2098 release_mutex (mutex);
2099 return 0;
2100 case WAIT_FAILED:
2101 case WAIT_TIMEOUT:
2102 return 0;
2103 }
2104
2105 return mutex;
2106 }
2107
2108 static gboolean
is_mutex_owned(const char * mutexname)2109 is_mutex_owned (const char *mutexname)
2110 {
2111 HANDLE mutex;
2112 gboolean res = FALSE;
2113
2114 mutex = CreateMutexA (NULL, FALSE, mutexname);
2115 if (WaitForSingleObject (mutex, 10) == WAIT_TIMEOUT)
2116 res = TRUE;
2117 else
2118 ReleaseMutex (mutex);
2119 CloseHandle (mutex);
2120
2121 return res;
2122 }
2123
2124 static char *
read_shm(const char * shm_name)2125 read_shm (const char *shm_name)
2126 {
2127 HANDLE shared_mem;
2128 char *shared_data;
2129 char *res;
2130 int i;
2131
2132 res = NULL;
2133
2134 for (i = 0; i < 20; i++)
2135 {
2136 shared_mem = OpenFileMappingA (FILE_MAP_READ, FALSE, shm_name);
2137 if (shared_mem != 0)
2138 break;
2139 Sleep (100);
2140 }
2141
2142 if (shared_mem != 0)
2143 {
2144 shared_data = MapViewOfFile (shared_mem, FILE_MAP_READ, 0, 0, 0);
2145 /* It looks that a race is possible here:
2146 * if the dbus process already created mapping but didn't fill it
2147 * the code below may read incorrect address.
2148 * Also this is a bit complicated by the fact that
2149 * any change in the "synchronization contract" between processes
2150 * should be accompanied with renaming all of used win32 named objects:
2151 * otherwise libgio-2.0-0.dll of different versions shipped with
2152 * different apps may break each other due to protocol difference.
2153 */
2154 if (shared_data != NULL)
2155 {
2156 res = g_strdup (shared_data);
2157 UnmapViewOfFile (shared_data);
2158 }
2159 CloseHandle (shared_mem);
2160 }
2161
2162 return res;
2163 }
2164
2165 static HANDLE
set_shm(const char * shm_name,const char * value)2166 set_shm (const char *shm_name, const char *value)
2167 {
2168 HANDLE shared_mem;
2169 char *shared_data;
2170
2171 shared_mem = CreateFileMappingA (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
2172 0, strlen (value) + 1, shm_name);
2173 if (shared_mem == 0)
2174 return 0;
2175
2176 shared_data = MapViewOfFile (shared_mem, FILE_MAP_WRITE, 0, 0, 0 );
2177 if (shared_data == NULL)
2178 return 0;
2179
2180 strcpy (shared_data, value);
2181
2182 UnmapViewOfFile (shared_data);
2183
2184 return shared_mem;
2185 }
2186
2187 /* These keep state between publish_session_bus and unpublish_session_bus */
2188 static HANDLE published_daemon_mutex;
2189 static HANDLE published_shared_mem;
2190
2191 static gboolean
publish_session_bus(const char * address)2192 publish_session_bus (const char *address)
2193 {
2194 HANDLE init_mutex;
2195
2196 init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2197
2198 published_daemon_mutex = CreateMutexA (NULL, FALSE, DBUS_DAEMON_MUTEX);
2199 if (WaitForSingleObject (published_daemon_mutex, 10 ) != WAIT_OBJECT_0)
2200 {
2201 release_mutex (init_mutex);
2202 CloseHandle (published_daemon_mutex);
2203 published_daemon_mutex = NULL;
2204 return FALSE;
2205 }
2206
2207 published_shared_mem = set_shm (DBUS_DAEMON_ADDRESS_INFO, address);
2208 if (!published_shared_mem)
2209 {
2210 release_mutex (init_mutex);
2211 CloseHandle (published_daemon_mutex);
2212 published_daemon_mutex = NULL;
2213 return FALSE;
2214 }
2215
2216 release_mutex (init_mutex);
2217 return TRUE;
2218 }
2219
2220 static void
unpublish_session_bus(void)2221 unpublish_session_bus (void)
2222 {
2223 HANDLE init_mutex;
2224
2225 init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2226
2227 CloseHandle (published_shared_mem);
2228 published_shared_mem = NULL;
2229
2230 release_mutex (published_daemon_mutex);
2231 published_daemon_mutex = NULL;
2232
2233 release_mutex (init_mutex);
2234 }
2235
2236 static void
wait_console_window(void)2237 wait_console_window (void)
2238 {
2239 FILE *console = fopen ("CONOUT$", "w");
2240
2241 SetConsoleTitleW (L"gdbus-daemon output. Type any character to close this window.");
2242 fprintf (console, _("(Type any character to close this window)\n"));
2243 fflush (console);
2244 _getch ();
2245 }
2246
2247 static void
open_console_window(void)2248 open_console_window (void)
2249 {
2250 if (((HANDLE) _get_osfhandle (fileno (stdout)) == INVALID_HANDLE_VALUE ||
2251 (HANDLE) _get_osfhandle (fileno (stderr)) == INVALID_HANDLE_VALUE) && AllocConsole ())
2252 {
2253 if ((HANDLE) _get_osfhandle (fileno (stdout)) == INVALID_HANDLE_VALUE)
2254 freopen ("CONOUT$", "w", stdout);
2255
2256 if ((HANDLE) _get_osfhandle (fileno (stderr)) == INVALID_HANDLE_VALUE)
2257 freopen ("CONOUT$", "w", stderr);
2258
2259 SetConsoleTitleW (L"gdbus-daemon debug output.");
2260
2261 atexit (wait_console_window);
2262 }
2263 }
2264
2265 static void
idle_timeout_cb(GDBusDaemon * daemon,gpointer user_data)2266 idle_timeout_cb (GDBusDaemon *daemon, gpointer user_data)
2267 {
2268 GMainLoop *loop = user_data;
2269 g_main_loop_quit (loop);
2270 }
2271
2272 /* Satisfies STARTF_FORCEONFEEDBACK */
2273 static void
turn_off_the_starting_cursor(void)2274 turn_off_the_starting_cursor (void)
2275 {
2276 MSG msg;
2277 BOOL bRet;
2278
2279 PostQuitMessage (0);
2280
2281 while ((bRet = GetMessage (&msg, 0, 0, 0)) != 0)
2282 {
2283 if (bRet == -1)
2284 continue;
2285
2286 TranslateMessage (&msg);
2287 DispatchMessage (&msg);
2288 }
2289 }
2290
2291 __declspec(dllexport) void __stdcall
g_win32_run_session_bus(void * hwnd,void * hinst,const char * cmdline,int cmdshow)2292 g_win32_run_session_bus (void* hwnd, void* hinst, const char* cmdline, int cmdshow)
2293 {
2294 GDBusDaemon *daemon;
2295 GMainLoop *loop;
2296 const char *address;
2297 GError *error = NULL;
2298
2299 turn_off_the_starting_cursor ();
2300
2301 if (g_getenv ("GDBUS_DAEMON_DEBUG") != NULL)
2302 open_console_window ();
2303
2304 address = "nonce-tcp:";
2305 daemon = _g_dbus_daemon_new (address, NULL, &error);
2306 if (daemon == NULL)
2307 {
2308 g_printerr ("Can't init bus: %s\n", error->message);
2309 g_error_free (error);
2310 return;
2311 }
2312
2313 loop = g_main_loop_new (NULL, FALSE);
2314
2315 /* There is a subtle detail with "idle-timeout" signal of dbus daemon:
2316 * It is fired on idle after last client disconnection,
2317 * but (at least with glib 2.59.1) it is NEVER fired
2318 * if no clients connect to daemon at all.
2319 * This may lead to infinite run of this daemon process.
2320 */
2321 g_signal_connect (daemon, "idle-timeout", G_CALLBACK (idle_timeout_cb), loop);
2322
2323 if (publish_session_bus (_g_dbus_daemon_get_address (daemon)))
2324 {
2325 g_main_loop_run (loop);
2326
2327 unpublish_session_bus ();
2328 }
2329
2330 g_main_loop_unref (loop);
2331 g_object_unref (daemon);
2332 }
2333
2334 static gboolean autolaunch_binary_absent = FALSE;
2335
2336 gchar *
_g_dbus_win32_get_session_address_dbus_launch(GError ** error)2337 _g_dbus_win32_get_session_address_dbus_launch (GError **error)
2338 {
2339 HANDLE autolaunch_mutex, init_mutex;
2340 char *address = NULL;
2341
2342 autolaunch_mutex = acquire_mutex (DBUS_AUTOLAUNCH_MUTEX);
2343
2344 init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2345
2346 if (is_mutex_owned (DBUS_DAEMON_MUTEX))
2347 address = read_shm (DBUS_DAEMON_ADDRESS_INFO);
2348
2349 release_mutex (init_mutex);
2350
2351 if (address == NULL && !autolaunch_binary_absent)
2352 {
2353 wchar_t gio_path[MAX_PATH + 2] = { 0 };
2354 int gio_path_len = GetModuleFileNameW (_g_io_win32_get_module (), gio_path, MAX_PATH + 1);
2355
2356 /* The <= MAX_PATH check prevents truncated path usage */
2357 if (gio_path_len > 0 && gio_path_len <= MAX_PATH)
2358 {
2359 PROCESS_INFORMATION pi = { 0 };
2360 STARTUPINFOW si = { 0 };
2361 BOOL res = FALSE;
2362 wchar_t exe_path[MAX_PATH + 100] = { 0 };
2363 /* calculate index of first char of dll file name inside full path */
2364 int gio_name_index = gio_path_len;
2365 for (; gio_name_index > 0; --gio_name_index)
2366 {
2367 wchar_t prev_char = gio_path[gio_name_index - 1];
2368 if (prev_char == L'\\' || prev_char == L'/')
2369 break;
2370 }
2371 gio_path[gio_name_index] = L'\0';
2372 wcscpy (exe_path, gio_path);
2373 wcscat (exe_path, L"\\gdbus.exe");
2374
2375 if (GetFileAttributesW (exe_path) == INVALID_FILE_ATTRIBUTES)
2376 {
2377 /* warning won't be raised another time
2378 * since autolaunch_binary_absent would be already set.
2379 */
2380 autolaunch_binary_absent = TRUE;
2381 g_warning ("win32 session dbus binary not found: %S", exe_path );
2382 }
2383 else
2384 {
2385 wchar_t args[MAX_PATH*2 + 100] = { 0 };
2386 wcscpy (args, L"\"");
2387 wcscat (args, exe_path);
2388 wcscat (args, L"\" ");
2389 #define _L_PREFIX_FOR_EXPANDED(arg) L##arg
2390 #define _L_PREFIX(arg) _L_PREFIX_FOR_EXPANDED (arg)
2391 wcscat (args, _L_PREFIX (_GDBUS_ARG_WIN32_RUN_SESSION_BUS));
2392 #undef _L_PREFIX
2393 #undef _L_PREFIX_FOR_EXPANDED
2394
2395 res = CreateProcessW (exe_path, args,
2396 0, 0, FALSE,
2397 NORMAL_PRIORITY_CLASS | CREATE_NO_WINDOW | DETACHED_PROCESS,
2398 0, gio_path,
2399 &si, &pi);
2400 }
2401 if (res)
2402 {
2403 address = read_shm (DBUS_DAEMON_ADDRESS_INFO);
2404 if (address == NULL)
2405 g_warning ("%S dbus binary failed to launch bus, maybe incompatible version", exe_path );
2406 }
2407 }
2408 }
2409
2410 release_mutex (autolaunch_mutex);
2411
2412 if (address == NULL)
2413 g_set_error (error,
2414 G_IO_ERROR,
2415 G_IO_ERROR_FAILED,
2416 _("Session dbus not running, and autolaunch failed"));
2417
2418 return address;
2419 }
2420
2421 #endif
2422
2423 /* ---------------------------------------------------------------------------------------------------- */
2424
2425 gchar *
_g_dbus_get_machine_id(GError ** error)2426 _g_dbus_get_machine_id (GError **error)
2427 {
2428 #ifdef G_OS_WIN32
2429 HW_PROFILE_INFOA info;
2430 char *src, *dest, *res;
2431 int i;
2432
2433 if (!GetCurrentHwProfileA (&info))
2434 {
2435 char *message = g_win32_error_message (GetLastError ());
2436 g_set_error (error,
2437 G_IO_ERROR,
2438 G_IO_ERROR_FAILED,
2439 _("Unable to get Hardware profile: %s"), message);
2440 g_free (message);
2441 return NULL;
2442 }
2443
2444 /* Form: {12340001-4980-1920-6788-123456789012} */
2445 src = &info.szHwProfileGuid[0];
2446
2447 res = g_malloc (32+1);
2448 dest = res;
2449
2450 src++; /* Skip { */
2451 for (i = 0; i < 8; i++)
2452 *dest++ = *src++;
2453 src++; /* Skip - */
2454 for (i = 0; i < 4; i++)
2455 *dest++ = *src++;
2456 src++; /* Skip - */
2457 for (i = 0; i < 4; i++)
2458 *dest++ = *src++;
2459 src++; /* Skip - */
2460 for (i = 0; i < 4; i++)
2461 *dest++ = *src++;
2462 src++; /* Skip - */
2463 for (i = 0; i < 12; i++)
2464 *dest++ = *src++;
2465 *dest = 0;
2466
2467 return res;
2468 #else
2469 gchar *ret;
2470 GError *first_error;
2471 /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2472 ret = NULL;
2473 first_error = NULL;
2474 if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2475 &ret,
2476 NULL,
2477 &first_error) &&
2478 !g_file_get_contents ("/etc/machine-id",
2479 &ret,
2480 NULL,
2481 NULL))
2482 {
2483 g_propagate_prefixed_error (error, first_error,
2484 _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2485 }
2486 else
2487 {
2488 /* ignore the error from the first try, if any */
2489 g_clear_error (&first_error);
2490 /* TODO: validate value */
2491 g_strstrip (ret);
2492 }
2493 return ret;
2494 #endif
2495 }
2496
2497 /* ---------------------------------------------------------------------------------------------------- */
2498
2499 gchar *
_g_dbus_enum_to_string(GType enum_type,gint value)2500 _g_dbus_enum_to_string (GType enum_type, gint value)
2501 {
2502 gchar *ret;
2503 GEnumClass *klass;
2504 GEnumValue *enum_value;
2505
2506 klass = g_type_class_ref (enum_type);
2507 enum_value = g_enum_get_value (klass, value);
2508 if (enum_value != NULL)
2509 ret = g_strdup (enum_value->value_nick);
2510 else
2511 ret = g_strdup_printf ("unknown (value %d)", value);
2512 g_type_class_unref (klass);
2513 return ret;
2514 }
2515
2516 /* ---------------------------------------------------------------------------------------------------- */
2517
2518 static void
write_message_print_transport_debug(gssize bytes_written,MessageToWriteData * data)2519 write_message_print_transport_debug (gssize bytes_written,
2520 MessageToWriteData *data)
2521 {
2522 if (G_LIKELY (!_g_dbus_debug_transport ()))
2523 goto out;
2524
2525 _g_dbus_debug_print_lock ();
2526 g_print ("========================================================================\n"
2527 "GDBus-debug:Transport:\n"
2528 " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2529 " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2530 bytes_written,
2531 g_dbus_message_get_serial (data->message),
2532 data->blob_size,
2533 data->total_written,
2534 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2535 _g_dbus_debug_print_unlock ();
2536 out:
2537 ;
2538 }
2539
2540 /* ---------------------------------------------------------------------------------------------------- */
2541
2542 static void
read_message_print_transport_debug(gssize bytes_read,GDBusWorker * worker)2543 read_message_print_transport_debug (gssize bytes_read,
2544 GDBusWorker *worker)
2545 {
2546 gsize size;
2547 gint32 serial;
2548 gint32 message_length;
2549
2550 if (G_LIKELY (!_g_dbus_debug_transport ()))
2551 goto out;
2552
2553 size = bytes_read + worker->read_buffer_cur_size;
2554 serial = 0;
2555 message_length = 0;
2556 if (size >= 16)
2557 message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2558 if (size >= 1)
2559 {
2560 switch (worker->read_buffer[0])
2561 {
2562 case 'l':
2563 if (size >= 12)
2564 serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2565 break;
2566 case 'B':
2567 if (size >= 12)
2568 serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2569 break;
2570 default:
2571 /* an error will be set elsewhere if this happens */
2572 goto out;
2573 }
2574 }
2575
2576 _g_dbus_debug_print_lock ();
2577 g_print ("========================================================================\n"
2578 "GDBus-debug:Transport:\n"
2579 " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2580 " size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2581 bytes_read,
2582 serial,
2583 message_length,
2584 worker->read_buffer_cur_size,
2585 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2586 _g_dbus_debug_print_unlock ();
2587 out:
2588 ;
2589 }
2590
2591 /* ---------------------------------------------------------------------------------------------------- */
2592
2593 gboolean
_g_signal_accumulator_false_handled(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)2594 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2595 GValue *return_accu,
2596 const GValue *handler_return,
2597 gpointer dummy)
2598 {
2599 gboolean continue_emission;
2600 gboolean signal_return;
2601
2602 signal_return = g_value_get_boolean (handler_return);
2603 g_value_set_boolean (return_accu, signal_return);
2604 continue_emission = signal_return;
2605
2606 return continue_emission;
2607 }
2608
2609 /* ---------------------------------------------------------------------------------------------------- */
2610
2611 static void
append_nibble(GString * s,gint val)2612 append_nibble (GString *s, gint val)
2613 {
2614 g_string_append_c (s, val >= 10 ? ('a' + val - 10) : ('0' + val));
2615 }
2616
2617 /* ---------------------------------------------------------------------------------------------------- */
2618
2619 gchar *
_g_dbus_hexencode(const gchar * str,gsize str_len)2620 _g_dbus_hexencode (const gchar *str,
2621 gsize str_len)
2622 {
2623 gsize n;
2624 GString *s;
2625
2626 s = g_string_new (NULL);
2627 for (n = 0; n < str_len; n++)
2628 {
2629 gint val;
2630 gint upper_nibble;
2631 gint lower_nibble;
2632
2633 val = ((const guchar *) str)[n];
2634 upper_nibble = val >> 4;
2635 lower_nibble = val & 0x0f;
2636
2637 append_nibble (s, upper_nibble);
2638 append_nibble (s, lower_nibble);
2639 }
2640
2641 return g_string_free (s, FALSE);
2642 }
2643