• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GDBus - GLib D-Bus Library
2  *
3  * Copyright (C) 2008-2010 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: David Zeuthen <davidz@redhat.com>
19  */
20 
21 #include "config.h"
22 
23 #include <stdlib.h>
24 #include <string.h>
25 
26 #include "giotypes.h"
27 #include "gioenumtypes.h"
28 #include "gsocket.h"
29 #include "gdbusauthobserver.h"
30 #include "gdbusprivate.h"
31 #include "gdbusmessage.h"
32 #include "gdbusconnection.h"
33 #include "gdbusproxy.h"
34 #include "gdbuserror.h"
35 #include "gdbusintrospection.h"
36 #include "gdbusdaemon.h"
37 #include "giomodule-priv.h"
38 #include "gtask.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "glib/gstdio.h"
43 #include "gsocketaddress.h"
44 #include "gsocketcontrolmessage.h"
45 #include "gsocketconnection.h"
46 #include "gsocketoutputstream.h"
47 
48 #ifdef G_OS_UNIX
49 #include "gunixfdmessage.h"
50 #include "gunixconnection.h"
51 #include "gunixcredentialsmessage.h"
52 #endif
53 
54 #ifdef G_OS_WIN32
55 #include <windows.h>
56 #include <io.h>
57 #include <conio.h>
58 #endif
59 
60 #include "glibintl.h"
61 
62 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
63 static void schedule_pending_close (GDBusWorker *worker);
64 
65 /* ---------------------------------------------------------------------------------------------------- */
66 
67 gchar *
_g_dbus_hexdump(const gchar * data,gsize len,guint indent)68 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
69 {
70  guint n, m;
71  GString *ret;
72 
73  ret = g_string_new (NULL);
74 
75  for (n = 0; n < len; n += 16)
76    {
77      g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
78 
79      for (m = n; m < n + 16; m++)
80        {
81          if (m > n && (m%4) == 0)
82            g_string_append_c (ret, ' ');
83          if (m < len)
84            g_string_append_printf (ret, "%02x ", (guchar) data[m]);
85          else
86            g_string_append (ret, "   ");
87        }
88 
89      g_string_append (ret, "   ");
90 
91      for (m = n; m < len && m < n + 16; m++)
92        g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
93 
94      g_string_append_c (ret, '\n');
95    }
96 
97  return g_string_free (ret, FALSE);
98 }
99 
100 /* ---------------------------------------------------------------------------------------------------- */
101 
102 /* Unfortunately ancillary messages are discarded when reading from a
103  * socket using the GSocketInputStream abstraction. So we provide a
104  * very GInputStream-ish API that uses GSocket in this case (very
105  * similar to GSocketInputStream).
106  */
107 
108 typedef struct
109 {
110   void *buffer;
111   gsize count;
112 
113   GSocketControlMessage ***messages;
114   gint *num_messages;
115 } ReadWithControlData;
116 
117 static void
read_with_control_data_free(ReadWithControlData * data)118 read_with_control_data_free (ReadWithControlData *data)
119 {
120   g_slice_free (ReadWithControlData, data);
121 }
122 
123 static gboolean
_g_socket_read_with_control_messages_ready(GSocket * socket,GIOCondition condition,gpointer user_data)124 _g_socket_read_with_control_messages_ready (GSocket      *socket,
125                                             GIOCondition  condition,
126                                             gpointer      user_data)
127 {
128   GTask *task = user_data;
129   ReadWithControlData *data = g_task_get_task_data (task);
130   GError *error;
131   gssize result;
132   GInputVector vector;
133 
134   error = NULL;
135   vector.buffer = data->buffer;
136   vector.size = data->count;
137   result = g_socket_receive_message (socket,
138                                      NULL, /* address */
139                                      &vector,
140                                      1,
141                                      data->messages,
142                                      data->num_messages,
143                                      NULL,
144                                      g_task_get_cancellable (task),
145                                      &error);
146 
147   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
148     {
149       g_error_free (error);
150       return TRUE;
151     }
152 
153   g_assert (result >= 0 || error != NULL);
154   if (result >= 0)
155     g_task_return_int (task, result);
156   else
157     g_task_return_error (task, error);
158   g_object_unref (task);
159 
160   return FALSE;
161 }
162 
163 static void
_g_socket_read_with_control_messages(GSocket * socket,void * buffer,gsize count,GSocketControlMessage *** messages,gint * num_messages,gint io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)164 _g_socket_read_with_control_messages (GSocket                 *socket,
165                                       void                    *buffer,
166                                       gsize                    count,
167                                       GSocketControlMessage ***messages,
168                                       gint                    *num_messages,
169                                       gint                     io_priority,
170                                       GCancellable            *cancellable,
171                                       GAsyncReadyCallback      callback,
172                                       gpointer                 user_data)
173 {
174   GTask *task;
175   ReadWithControlData *data;
176   GSource *source;
177 
178   data = g_slice_new0 (ReadWithControlData);
179   data->buffer = buffer;
180   data->count = count;
181   data->messages = messages;
182   data->num_messages = num_messages;
183 
184   task = g_task_new (socket, cancellable, callback, user_data);
185   g_task_set_source_tag (task, _g_socket_read_with_control_messages);
186   g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free);
187 
188   if (g_socket_condition_check (socket, G_IO_IN))
189     {
190       if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task))
191         return;
192     }
193 
194   source = g_socket_create_source (socket,
195                                    G_IO_IN | G_IO_HUP | G_IO_ERR,
196                                    cancellable);
197   g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready);
198   g_source_unref (source);
199 }
200 
201 static gssize
_g_socket_read_with_control_messages_finish(GSocket * socket,GAsyncResult * result,GError ** error)202 _g_socket_read_with_control_messages_finish (GSocket       *socket,
203                                              GAsyncResult  *result,
204                                              GError       **error)
205 {
206   g_return_val_if_fail (G_IS_SOCKET (socket), -1);
207   g_return_val_if_fail (g_task_is_valid (result, socket), -1);
208 
209   return g_task_propagate_int (G_TASK (result), error);
210 }
211 
212 /* ---------------------------------------------------------------------------------------------------- */
213 
214 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=674885
215    and see also the original https://bugzilla.gnome.org/show_bug.cgi?id=627724  */
216 
217 static GPtrArray *ensured_classes = NULL;
218 
219 static void
ensure_type(GType gtype)220 ensure_type (GType gtype)
221 {
222   g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
223 }
224 
225 static void
release_required_types(void)226 release_required_types (void)
227 {
228   g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
229   g_ptr_array_unref (ensured_classes);
230   ensured_classes = NULL;
231 }
232 
233 static void
ensure_required_types(void)234 ensure_required_types (void)
235 {
236   g_assert (ensured_classes == NULL);
237   ensured_classes = g_ptr_array_new ();
238   /* Generally in this list, you should initialize types which are used as
239    * properties first, then the class which has them. For example, GDBusProxy
240    * has a type of GDBusConnection, so we initialize GDBusConnection first.
241    * And because GDBusConnection has a property of type GDBusConnectionFlags,
242    * we initialize that first.
243    *
244    * Similarly, GSocket has a type of GSocketAddress.
245    *
246    * We don't fill out the whole dependency tree right now because in practice
247    * it tends to be just types that GDBus use that cause pain, and there
248    * is work on a more general approach in https://bugzilla.gnome.org/show_bug.cgi?id=674885
249    */
250   ensure_type (G_TYPE_TASK);
251   ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
252   ensure_type (G_TYPE_DBUS_CONNECTION_FLAGS);
253   ensure_type (G_TYPE_DBUS_CAPABILITY_FLAGS);
254   ensure_type (G_TYPE_DBUS_AUTH_OBSERVER);
255   ensure_type (G_TYPE_DBUS_CONNECTION);
256   ensure_type (G_TYPE_DBUS_PROXY);
257   ensure_type (G_TYPE_SOCKET_FAMILY);
258   ensure_type (G_TYPE_SOCKET_TYPE);
259   ensure_type (G_TYPE_SOCKET_PROTOCOL);
260   ensure_type (G_TYPE_SOCKET_ADDRESS);
261   ensure_type (G_TYPE_SOCKET);
262 }
263 /* ---------------------------------------------------------------------------------------------------- */
264 
265 typedef struct
266 {
267   volatile gint refcount;
268   GThread *thread;
269   GMainContext *context;
270   GMainLoop *loop;
271 } SharedThreadData;
272 
273 static gpointer
gdbus_shared_thread_func(gpointer user_data)274 gdbus_shared_thread_func (gpointer user_data)
275 {
276   SharedThreadData *data = user_data;
277 
278   g_main_context_push_thread_default (data->context);
279   g_main_loop_run (data->loop);
280   g_main_context_pop_thread_default (data->context);
281 
282   release_required_types ();
283 
284   return NULL;
285 }
286 
287 /* ---------------------------------------------------------------------------------------------------- */
288 
289 static SharedThreadData *
_g_dbus_shared_thread_ref(void)290 _g_dbus_shared_thread_ref (void)
291 {
292   static gsize shared_thread_data = 0;
293   SharedThreadData *ret;
294 
295   if (g_once_init_enter (&shared_thread_data))
296     {
297       SharedThreadData *data;
298 
299       data = g_new0 (SharedThreadData, 1);
300       data->refcount = 0;
301 
302       data->context = g_main_context_new ();
303       data->loop = g_main_loop_new (data->context, FALSE);
304       data->thread = g_thread_new ("gdbus",
305                                    gdbus_shared_thread_func,
306                                    data);
307       /* We can cast between gsize and gpointer safely */
308       g_once_init_leave (&shared_thread_data, (gsize) data);
309     }
310 
311   ret = (SharedThreadData*) shared_thread_data;
312   g_atomic_int_inc (&ret->refcount);
313   return ret;
314 }
315 
316 static void
_g_dbus_shared_thread_unref(SharedThreadData * data)317 _g_dbus_shared_thread_unref (SharedThreadData *data)
318 {
319   /* TODO: actually destroy the shared thread here */
320 #if 0
321   g_assert (data != NULL);
322   if (g_atomic_int_dec_and_test (&data->refcount))
323     {
324       g_main_loop_quit (data->loop);
325       //g_thread_join (data->thread);
326       g_main_loop_unref (data->loop);
327       g_main_context_unref (data->context);
328     }
329 #endif
330 }
331 
332 /* ---------------------------------------------------------------------------------------------------- */
333 
334 typedef enum {
335     PENDING_NONE = 0,
336     PENDING_WRITE,
337     PENDING_FLUSH,
338     PENDING_CLOSE
339 } OutputPending;
340 
341 struct GDBusWorker
342 {
343   volatile gint                       ref_count;
344 
345   SharedThreadData                   *shared_thread_data;
346 
347   /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
348   volatile gint                       stopped;
349 
350   /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
351    * only affects messages received from the other peer (since GDBusServer is the
352    * only user) - we might want it to affect messages sent to the other peer too?
353    */
354   gboolean                            frozen;
355   GDBusCapabilityFlags                capabilities;
356   GQueue                             *received_messages_while_frozen;
357 
358   GIOStream                          *stream;
359   GCancellable                       *cancellable;
360   GDBusWorkerMessageReceivedCallback  message_received_callback;
361   GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
362   GDBusWorkerDisconnectedCallback     disconnected_callback;
363   gpointer                            user_data;
364 
365   /* if not NULL, stream is GSocketConnection */
366   GSocket *socket;
367 
368   /* used for reading */
369   GMutex                              read_lock;
370   gchar                              *read_buffer;
371   gsize                               read_buffer_allocated_size;
372   gsize                               read_buffer_cur_size;
373   gsize                               read_buffer_bytes_wanted;
374   GUnixFDList                        *read_fd_list;
375   GSocketControlMessage             **read_ancillary_messages;
376   gint                                read_num_ancillary_messages;
377 
378   /* Whether an async write, flush or close, or none of those, is pending.
379    * Only the worker thread may change its value, and only with the write_lock.
380    * Other threads may read its value when holding the write_lock.
381    * The worker thread may read its value at any time.
382    */
383   OutputPending                       output_pending;
384   /* used for writing */
385   GMutex                              write_lock;
386   /* queue of MessageToWriteData, protected by write_lock */
387   GQueue                             *write_queue;
388   /* protected by write_lock */
389   guint64                             write_num_messages_written;
390   /* number of messages we'd written out last time we flushed;
391    * protected by write_lock
392    */
393   guint64                             write_num_messages_flushed;
394   /* list of FlushData, protected by write_lock */
395   GList                              *write_pending_flushes;
396   /* list of CloseData, protected by write_lock */
397   GList                              *pending_close_attempts;
398   /* no lock - only used from the worker thread */
399   gboolean                            close_expected;
400 };
401 
402 static void _g_dbus_worker_unref (GDBusWorker *worker);
403 
404 /* ---------------------------------------------------------------------------------------------------- */
405 
406 typedef struct
407 {
408   GMutex  mutex;
409   GCond   cond;
410   guint64 number_to_wait_for;
411   gboolean finished;
412   GError *error;
413 } FlushData;
414 
415 struct _MessageToWriteData ;
416 typedef struct _MessageToWriteData MessageToWriteData;
417 
418 static void message_to_write_data_free (MessageToWriteData *data);
419 
420 static void read_message_print_transport_debug (gssize bytes_read,
421                                                 GDBusWorker *worker);
422 
423 static void write_message_print_transport_debug (gssize bytes_written,
424                                                  MessageToWriteData *data);
425 
426 typedef struct {
427     GDBusWorker *worker;
428     GTask *task;
429 } CloseData;
430 
close_data_free(CloseData * close_data)431 static void close_data_free (CloseData *close_data)
432 {
433   g_clear_object (&close_data->task);
434 
435   _g_dbus_worker_unref (close_data->worker);
436   g_slice_free (CloseData, close_data);
437 }
438 
439 /* ---------------------------------------------------------------------------------------------------- */
440 
441 static GDBusWorker *
_g_dbus_worker_ref(GDBusWorker * worker)442 _g_dbus_worker_ref (GDBusWorker *worker)
443 {
444   g_atomic_int_inc (&worker->ref_count);
445   return worker;
446 }
447 
448 static void
_g_dbus_worker_unref(GDBusWorker * worker)449 _g_dbus_worker_unref (GDBusWorker *worker)
450 {
451   if (g_atomic_int_dec_and_test (&worker->ref_count))
452     {
453       g_assert (worker->write_pending_flushes == NULL);
454 
455       _g_dbus_shared_thread_unref (worker->shared_thread_data);
456 
457       g_object_unref (worker->stream);
458 
459       g_mutex_clear (&worker->read_lock);
460       g_object_unref (worker->cancellable);
461       if (worker->read_fd_list != NULL)
462         g_object_unref (worker->read_fd_list);
463 
464       g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
465       g_mutex_clear (&worker->write_lock);
466       g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
467       g_free (worker->read_buffer);
468 
469       g_free (worker);
470     }
471 }
472 
473 static void
_g_dbus_worker_emit_disconnected(GDBusWorker * worker,gboolean remote_peer_vanished,GError * error)474 _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
475                                   gboolean      remote_peer_vanished,
476                                   GError       *error)
477 {
478   if (!g_atomic_int_get (&worker->stopped))
479     worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
480 }
481 
482 static void
_g_dbus_worker_emit_message_received(GDBusWorker * worker,GDBusMessage * message)483 _g_dbus_worker_emit_message_received (GDBusWorker  *worker,
484                                       GDBusMessage *message)
485 {
486   if (!g_atomic_int_get (&worker->stopped))
487     worker->message_received_callback (worker, message, worker->user_data);
488 }
489 
490 static GDBusMessage *
_g_dbus_worker_emit_message_about_to_be_sent(GDBusWorker * worker,GDBusMessage * message)491 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
492                                               GDBusMessage *message)
493 {
494   GDBusMessage *ret;
495   if (!g_atomic_int_get (&worker->stopped))
496     ret = worker->message_about_to_be_sent_callback (worker, g_steal_pointer (&message), worker->user_data);
497   else
498     ret = g_steal_pointer (&message);
499   return ret;
500 }
501 
502 /* can only be called from private thread with read-lock held - takes ownership of @message */
503 static void
_g_dbus_worker_queue_or_deliver_received_message(GDBusWorker * worker,GDBusMessage * message)504 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
505                                                   GDBusMessage *message)
506 {
507   if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
508     {
509       /* queue up */
510       g_queue_push_tail (worker->received_messages_while_frozen, g_steal_pointer (&message));
511     }
512   else
513     {
514       /* not frozen, nor anything in queue */
515       _g_dbus_worker_emit_message_received (worker, message);
516       g_clear_object (&message);
517     }
518 }
519 
520 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
521 static gboolean
unfreeze_in_idle_cb(gpointer user_data)522 unfreeze_in_idle_cb (gpointer user_data)
523 {
524   GDBusWorker *worker = user_data;
525   GDBusMessage *message;
526 
527   g_mutex_lock (&worker->read_lock);
528   if (worker->frozen)
529     {
530       while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
531         {
532           _g_dbus_worker_emit_message_received (worker, message);
533           g_clear_object (&message);
534         }
535       worker->frozen = FALSE;
536     }
537   else
538     {
539       g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
540     }
541   g_mutex_unlock (&worker->read_lock);
542   return FALSE;
543 }
544 
545 /* can be called from any thread */
546 void
_g_dbus_worker_unfreeze(GDBusWorker * worker)547 _g_dbus_worker_unfreeze (GDBusWorker *worker)
548 {
549   GSource *idle_source;
550   idle_source = g_idle_source_new ();
551   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
552   g_source_set_callback (idle_source,
553                          unfreeze_in_idle_cb,
554                          _g_dbus_worker_ref (worker),
555                          (GDestroyNotify) _g_dbus_worker_unref);
556   g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
557   g_source_attach (idle_source, worker->shared_thread_data->context);
558   g_source_unref (idle_source);
559 }
560 
561 /* ---------------------------------------------------------------------------------------------------- */
562 
563 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
564 
565 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
566 static void
_g_dbus_worker_do_read_cb(GInputStream * input_stream,GAsyncResult * res,gpointer user_data)567 _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
568                            GAsyncResult  *res,
569                            gpointer       user_data)
570 {
571   GDBusWorker *worker = user_data;
572   GError *error;
573   gssize bytes_read;
574 
575   g_mutex_lock (&worker->read_lock);
576 
577   /* If already stopped, don't even process the reply */
578   if (g_atomic_int_get (&worker->stopped))
579     goto out;
580 
581   error = NULL;
582   if (worker->socket == NULL)
583     bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
584                                              res,
585                                              &error);
586   else
587     bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
588                                                               res,
589                                                               &error);
590   if (worker->read_num_ancillary_messages > 0)
591     {
592       gint n;
593       for (n = 0; n < worker->read_num_ancillary_messages; n++)
594         {
595           GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
596 
597           if (FALSE)
598             {
599             }
600 #ifdef G_OS_UNIX
601           else if (G_IS_UNIX_FD_MESSAGE (control_message))
602             {
603               GUnixFDMessage *fd_message;
604               gint *fds;
605               gint num_fds;
606 
607               fd_message = G_UNIX_FD_MESSAGE (control_message);
608               fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
609               if (worker->read_fd_list == NULL)
610                 {
611                   worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
612                 }
613               else
614                 {
615                   gint n;
616                   for (n = 0; n < num_fds; n++)
617                     {
618                       /* TODO: really want a append_steal() */
619                       g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
620                       (void) g_close (fds[n], NULL);
621                     }
622                 }
623               g_free (fds);
624             }
625           else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
626             {
627               /* do nothing */
628             }
629 #endif
630           else
631             {
632               if (error == NULL)
633                 {
634                   g_set_error (&error,
635                                G_IO_ERROR,
636                                G_IO_ERROR_FAILED,
637                                "Unexpected ancillary message of type %s received from peer",
638                                g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
639                   _g_dbus_worker_emit_disconnected (worker, TRUE, error);
640                   g_error_free (error);
641                   g_object_unref (control_message);
642                   n++;
643                   while (n < worker->read_num_ancillary_messages)
644                     g_object_unref (worker->read_ancillary_messages[n++]);
645                   g_free (worker->read_ancillary_messages);
646                   goto out;
647                 }
648             }
649           g_object_unref (control_message);
650         }
651       g_free (worker->read_ancillary_messages);
652     }
653 
654   if (bytes_read == -1)
655     {
656       if (G_UNLIKELY (_g_dbus_debug_transport ()))
657         {
658           _g_dbus_debug_print_lock ();
659           g_print ("========================================================================\n"
660                    "GDBus-debug:Transport:\n"
661                    "  ---- READ ERROR on stream of type %s:\n"
662                    "  ---- %s %d: %s\n",
663                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
664                    g_quark_to_string (error->domain), error->code,
665                    error->message);
666           _g_dbus_debug_print_unlock ();
667         }
668 
669       /* Every async read that uses this callback uses worker->cancellable
670        * as its GCancellable. worker->cancellable gets cancelled if and only
671        * if the GDBusConnection tells us to close (either via
672        * _g_dbus_worker_stop, which is called on last-unref, or directly),
673        * so a cancelled read must mean our connection was closed locally.
674        *
675        * If we're closing, other errors are possible - notably,
676        * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
677        * read in-flight. It seems sensible to treat all read errors during
678        * closing as an expected thing that doesn't trip exit-on-close.
679        *
680        * Because close_expected can't be set until we get into the worker
681        * thread, but the cancellable is signalled sooner (from another
682        * thread), we do still need to check the error.
683        */
684       if (worker->close_expected ||
685           g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
686         _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
687       else
688         _g_dbus_worker_emit_disconnected (worker, TRUE, error);
689 
690       g_error_free (error);
691       goto out;
692     }
693 
694 #if 0
695   g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
696            (gint) bytes_read,
697            g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
698            g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
699            g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
700                                      G_IO_IN | G_IO_OUT | G_IO_HUP),
701            worker->stream,
702            worker);
703 #endif
704 
705   /* The read failed, which could mean the dbus-daemon was sent SIGTERM. */
706   if (bytes_read == 0)
707     {
708       g_set_error (&error,
709                    G_IO_ERROR,
710                    G_IO_ERROR_FAILED,
711                    "Underlying GIOStream returned 0 bytes on an async read");
712       _g_dbus_worker_emit_disconnected (worker, TRUE, error);
713       g_error_free (error);
714       goto out;
715     }
716 
717   read_message_print_transport_debug (bytes_read, worker);
718 
719   worker->read_buffer_cur_size += bytes_read;
720   if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
721     {
722       /* OK, got what we asked for! */
723       if (worker->read_buffer_bytes_wanted == 16)
724         {
725           gssize message_len;
726           /* OK, got the header - determine how many more bytes are needed */
727           error = NULL;
728           message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
729                                                      16,
730                                                      &error);
731           if (message_len == -1)
732             {
733               g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
734               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
735               g_error_free (error);
736               goto out;
737             }
738 
739           worker->read_buffer_bytes_wanted = message_len;
740           _g_dbus_worker_do_read_unlocked (worker);
741         }
742       else
743         {
744           GDBusMessage *message;
745           error = NULL;
746 
747           /* TODO: use connection->priv->auth to decode the message */
748 
749           message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
750                                                   worker->read_buffer_cur_size,
751                                                   worker->capabilities,
752                                                   &error);
753           if (message == NULL)
754             {
755               gchar *s;
756               s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
757               g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
758                          "The error is: %s\n"
759                          "The payload is as follows:\n"
760                          "%s",
761                          worker->read_buffer_cur_size,
762                          error->message,
763                          s);
764               g_free (s);
765               _g_dbus_worker_emit_disconnected (worker, FALSE, error);
766               g_error_free (error);
767               goto out;
768             }
769 
770 #ifdef G_OS_UNIX
771           if (worker->read_fd_list != NULL)
772             {
773               g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
774               g_object_unref (worker->read_fd_list);
775               worker->read_fd_list = NULL;
776             }
777 #endif
778 
779           if (G_UNLIKELY (_g_dbus_debug_message ()))
780             {
781               gchar *s;
782               _g_dbus_debug_print_lock ();
783               g_print ("========================================================================\n"
784                        "GDBus-debug:Message:\n"
785                        "  <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
786                        worker->read_buffer_cur_size);
787               s = g_dbus_message_print (message, 2);
788               g_print ("%s", s);
789               g_free (s);
790               if (G_UNLIKELY (_g_dbus_debug_payload ()))
791                 {
792                   s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
793                   g_print ("%s\n", s);
794                   g_free (s);
795                 }
796               _g_dbus_debug_print_unlock ();
797             }
798 
799           /* yay, got a message, go deliver it */
800           _g_dbus_worker_queue_or_deliver_received_message (worker, g_steal_pointer (&message));
801 
802           /* start reading another message! */
803           worker->read_buffer_bytes_wanted = 0;
804           worker->read_buffer_cur_size = 0;
805           _g_dbus_worker_do_read_unlocked (worker);
806         }
807     }
808   else
809     {
810       /* didn't get all the bytes we requested - so repeat the request... */
811       _g_dbus_worker_do_read_unlocked (worker);
812     }
813 
814  out:
815   g_mutex_unlock (&worker->read_lock);
816 
817   /* check if there is any pending close */
818   schedule_pending_close (worker);
819 
820   /* gives up the reference acquired when calling g_input_stream_read_async() */
821   _g_dbus_worker_unref (worker);
822 }
823 
824 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
825 static void
_g_dbus_worker_do_read_unlocked(GDBusWorker * worker)826 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
827 {
828   /* Note that we do need to keep trying to read even if close_expected is
829    * true, because only failing a read causes us to signal 'closed'.
830    */
831 
832   /* if bytes_wanted is zero, it means start reading a message */
833   if (worker->read_buffer_bytes_wanted == 0)
834     {
835       worker->read_buffer_cur_size = 0;
836       worker->read_buffer_bytes_wanted = 16;
837     }
838 
839   /* ensure we have a (big enough) buffer */
840   if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
841     {
842       /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
843       worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
844       worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
845     }
846 
847   if (worker->socket == NULL)
848     g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
849                                worker->read_buffer + worker->read_buffer_cur_size,
850                                worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
851                                G_PRIORITY_DEFAULT,
852                                worker->cancellable,
853                                (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
854                                _g_dbus_worker_ref (worker));
855   else
856     {
857       worker->read_ancillary_messages = NULL;
858       worker->read_num_ancillary_messages = 0;
859       _g_socket_read_with_control_messages (worker->socket,
860                                             worker->read_buffer + worker->read_buffer_cur_size,
861                                             worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
862                                             &worker->read_ancillary_messages,
863                                             &worker->read_num_ancillary_messages,
864                                             G_PRIORITY_DEFAULT,
865                                             worker->cancellable,
866                                             (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
867                                             _g_dbus_worker_ref (worker));
868     }
869 }
870 
871 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
872 static gboolean
_g_dbus_worker_do_initial_read(gpointer data)873 _g_dbus_worker_do_initial_read (gpointer data)
874 {
875   GDBusWorker *worker = data;
876   g_mutex_lock (&worker->read_lock);
877   _g_dbus_worker_do_read_unlocked (worker);
878   g_mutex_unlock (&worker->read_lock);
879   return FALSE;
880 }
881 
882 /* ---------------------------------------------------------------------------------------------------- */
883 
884 struct _MessageToWriteData
885 {
886   GDBusWorker  *worker;
887   GDBusMessage *message;
888   gchar        *blob;
889   gsize         blob_size;
890 
891   gsize         total_written;
892   GTask        *task;
893 };
894 
895 static void
message_to_write_data_free(MessageToWriteData * data)896 message_to_write_data_free (MessageToWriteData *data)
897 {
898   _g_dbus_worker_unref (data->worker);
899   if (data->message)
900     g_object_unref (data->message);
901   g_free (data->blob);
902   g_slice_free (MessageToWriteData, data);
903 }
904 
905 /* ---------------------------------------------------------------------------------------------------- */
906 
907 static void write_message_continue_writing (MessageToWriteData *data);
908 
909 /* called in private thread shared by all GDBusConnection instances
910  *
911  * write-lock is not held on entry
912  * output_pending is PENDING_WRITE on entry
913  */
914 static void
write_message_async_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)915 write_message_async_cb (GObject      *source_object,
916                         GAsyncResult *res,
917                         gpointer      user_data)
918 {
919   MessageToWriteData *data = user_data;
920   GTask *task;
921   gssize bytes_written;
922   GError *error;
923 
924   /* Note: we can't access data->task after calling g_task_return_* () because the
925    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
926    */
927   task = data->task;
928 
929   error = NULL;
930   bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
931                                                 res,
932                                                 &error);
933   if (bytes_written == -1)
934     {
935       g_task_return_error (task, error);
936       g_object_unref (task);
937       goto out;
938     }
939   g_assert (bytes_written > 0); /* zero is never returned */
940 
941   write_message_print_transport_debug (bytes_written, data);
942 
943   data->total_written += bytes_written;
944   g_assert (data->total_written <= data->blob_size);
945   if (data->total_written == data->blob_size)
946     {
947       g_task_return_boolean (task, TRUE);
948       g_object_unref (task);
949       goto out;
950     }
951 
952   write_message_continue_writing (data);
953 
954  out:
955   ;
956 }
957 
958 /* called in private thread shared by all GDBusConnection instances
959  *
960  * write-lock is not held on entry
961  * output_pending is PENDING_WRITE on entry
962  */
963 #ifdef G_OS_UNIX
964 static gboolean
on_socket_ready(GSocket * socket,GIOCondition condition,gpointer user_data)965 on_socket_ready (GSocket      *socket,
966                  GIOCondition  condition,
967                  gpointer      user_data)
968 {
969   MessageToWriteData *data = user_data;
970   write_message_continue_writing (data);
971   return FALSE; /* remove source */
972 }
973 #endif
974 
975 /* called in private thread shared by all GDBusConnection instances
976  *
977  * write-lock is not held on entry
978  * output_pending is PENDING_WRITE on entry
979  */
980 static void
write_message_continue_writing(MessageToWriteData * data)981 write_message_continue_writing (MessageToWriteData *data)
982 {
983   GOutputStream *ostream;
984 #ifdef G_OS_UNIX
985   GTask *task;
986   GUnixFDList *fd_list;
987 #endif
988 
989 #ifdef G_OS_UNIX
990   /* Note: we can't access data->task after calling g_task_return_* () because the
991    * callback can free @data and we're not completing in idle. So use a copy of the pointer.
992    */
993   task = data->task;
994 #endif
995 
996   ostream = g_io_stream_get_output_stream (data->worker->stream);
997 #ifdef G_OS_UNIX
998   fd_list = g_dbus_message_get_unix_fd_list (data->message);
999 #endif
1000 
1001   g_assert (!g_output_stream_has_pending (ostream));
1002   g_assert_cmpint (data->total_written, <, data->blob_size);
1003 
1004   if (FALSE)
1005     {
1006     }
1007 #ifdef G_OS_UNIX
1008   else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
1009     {
1010       GOutputVector vector;
1011       GSocketControlMessage *control_message;
1012       gssize bytes_written;
1013       GError *error;
1014 
1015       vector.buffer = data->blob;
1016       vector.size = data->blob_size;
1017 
1018       control_message = NULL;
1019       if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
1020         {
1021           if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
1022             {
1023               g_task_return_new_error (task,
1024                                        G_IO_ERROR,
1025                                        G_IO_ERROR_FAILED,
1026                                        "Tried sending a file descriptor but remote peer does not support this capability");
1027               g_object_unref (task);
1028               goto out;
1029             }
1030           control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1031         }
1032 
1033       error = NULL;
1034       bytes_written = g_socket_send_message (data->worker->socket,
1035                                              NULL, /* address */
1036                                              &vector,
1037                                              1,
1038                                              control_message != NULL ? &control_message : NULL,
1039                                              control_message != NULL ? 1 : 0,
1040                                              G_SOCKET_MSG_NONE,
1041                                              data->worker->cancellable,
1042                                              &error);
1043       if (control_message != NULL)
1044         g_object_unref (control_message);
1045 
1046       if (bytes_written == -1)
1047         {
1048           /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1049           if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1050             {
1051               GSource *source;
1052               source = g_socket_create_source (data->worker->socket,
1053                                                G_IO_OUT | G_IO_HUP | G_IO_ERR,
1054                                                data->worker->cancellable);
1055               g_source_set_callback (source,
1056                                      (GSourceFunc) on_socket_ready,
1057                                      data,
1058                                      NULL); /* GDestroyNotify */
1059               g_source_attach (source, g_main_context_get_thread_default ());
1060               g_source_unref (source);
1061               g_error_free (error);
1062               goto out;
1063             }
1064           g_task_return_error (task, error);
1065           g_object_unref (task);
1066           goto out;
1067         }
1068       g_assert (bytes_written > 0); /* zero is never returned */
1069 
1070       write_message_print_transport_debug (bytes_written, data);
1071 
1072       data->total_written += bytes_written;
1073       g_assert (data->total_written <= data->blob_size);
1074       if (data->total_written == data->blob_size)
1075         {
1076           g_task_return_boolean (task, TRUE);
1077           g_object_unref (task);
1078           goto out;
1079         }
1080 
1081       write_message_continue_writing (data);
1082     }
1083 #endif
1084   else
1085     {
1086 #ifdef G_OS_UNIX
1087       if (fd_list != NULL)
1088         {
1089           g_task_return_new_error (task,
1090                                    G_IO_ERROR,
1091                                    G_IO_ERROR_FAILED,
1092                                    "Tried sending a file descriptor on unsupported stream of type %s",
1093                                    g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1094           g_object_unref (task);
1095           goto out;
1096         }
1097 #endif
1098 
1099       g_output_stream_write_async (ostream,
1100                                    (const gchar *) data->blob + data->total_written,
1101                                    data->blob_size - data->total_written,
1102                                    G_PRIORITY_DEFAULT,
1103                                    data->worker->cancellable,
1104                                    write_message_async_cb,
1105                                    data);
1106     }
1107 #ifdef G_OS_UNIX
1108  out:
1109 #endif
1110   ;
1111 }
1112 
1113 /* called in private thread shared by all GDBusConnection instances
1114  *
1115  * write-lock is not held on entry
1116  * output_pending is PENDING_WRITE on entry
1117  */
1118 static void
write_message_async(GDBusWorker * worker,MessageToWriteData * data,GAsyncReadyCallback callback,gpointer user_data)1119 write_message_async (GDBusWorker         *worker,
1120                      MessageToWriteData  *data,
1121                      GAsyncReadyCallback  callback,
1122                      gpointer             user_data)
1123 {
1124   data->task = g_task_new (NULL, NULL, callback, user_data);
1125   g_task_set_source_tag (data->task, write_message_async);
1126   data->total_written = 0;
1127   write_message_continue_writing (data);
1128 }
1129 
1130 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1131 static gboolean
write_message_finish(GAsyncResult * res,GError ** error)1132 write_message_finish (GAsyncResult   *res,
1133                       GError        **error)
1134 {
1135   g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE);
1136 
1137   return g_task_propagate_boolean (G_TASK (res), error);
1138 }
1139 /* ---------------------------------------------------------------------------------------------------- */
1140 
1141 static void continue_writing (GDBusWorker *worker);
1142 
1143 typedef struct
1144 {
1145   GDBusWorker *worker;
1146   GList *flushers;
1147 } FlushAsyncData;
1148 
1149 static void
flush_data_list_complete(const GList * flushers,const GError * error)1150 flush_data_list_complete (const GList  *flushers,
1151                           const GError *error)
1152 {
1153   const GList *l;
1154 
1155   for (l = flushers; l != NULL; l = l->next)
1156     {
1157       FlushData *f = l->data;
1158 
1159       f->error = error != NULL ? g_error_copy (error) : NULL;
1160 
1161       g_mutex_lock (&f->mutex);
1162       f->finished = TRUE;
1163       g_cond_signal (&f->cond);
1164       g_mutex_unlock (&f->mutex);
1165     }
1166 }
1167 
1168 /* called in private thread shared by all GDBusConnection instances
1169  *
1170  * write-lock is not held on entry
1171  * output_pending is PENDING_FLUSH on entry
1172  */
1173 static void
ostream_flush_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1174 ostream_flush_cb (GObject      *source_object,
1175                   GAsyncResult *res,
1176                   gpointer      user_data)
1177 {
1178   FlushAsyncData *data = user_data;
1179   GError *error;
1180 
1181   error = NULL;
1182   g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1183                                 res,
1184                                 &error);
1185 
1186   if (error == NULL)
1187     {
1188       if (G_UNLIKELY (_g_dbus_debug_transport ()))
1189         {
1190           _g_dbus_debug_print_lock ();
1191           g_print ("========================================================================\n"
1192                    "GDBus-debug:Transport:\n"
1193                    "  ---- FLUSHED stream of type %s\n",
1194                    g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1195           _g_dbus_debug_print_unlock ();
1196         }
1197     }
1198 
1199   /* Make sure we tell folks that we don't have additional
1200      flushes pending */
1201   g_mutex_lock (&data->worker->write_lock);
1202   data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1203   g_assert (data->worker->output_pending == PENDING_FLUSH);
1204   data->worker->output_pending = PENDING_NONE;
1205   g_mutex_unlock (&data->worker->write_lock);
1206 
1207   g_assert (data->flushers != NULL);
1208   flush_data_list_complete (data->flushers, error);
1209   g_list_free (data->flushers);
1210   if (error != NULL)
1211     g_error_free (error);
1212 
1213   /* OK, cool, finally kick off the next write */
1214   continue_writing (data->worker);
1215 
1216   _g_dbus_worker_unref (data->worker);
1217   g_free (data);
1218 }
1219 
1220 /* called in private thread shared by all GDBusConnection instances
1221  *
1222  * write-lock is not held on entry
1223  * output_pending is PENDING_FLUSH on entry
1224  */
1225 static void
start_flush(FlushAsyncData * data)1226 start_flush (FlushAsyncData *data)
1227 {
1228   g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1229                                G_PRIORITY_DEFAULT,
1230                                data->worker->cancellable,
1231                                ostream_flush_cb,
1232                                data);
1233 }
1234 
1235 /* called in private thread shared by all GDBusConnection instances
1236  *
1237  * write-lock is held on entry
1238  * output_pending is PENDING_NONE on entry
1239  */
1240 static void
message_written_unlocked(GDBusWorker * worker,MessageToWriteData * message_data)1241 message_written_unlocked (GDBusWorker *worker,
1242                           MessageToWriteData *message_data)
1243 {
1244   if (G_UNLIKELY (_g_dbus_debug_message ()))
1245     {
1246       gchar *s;
1247       _g_dbus_debug_print_lock ();
1248       g_print ("========================================================================\n"
1249                "GDBus-debug:Message:\n"
1250                "  >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1251                message_data->blob_size);
1252       s = g_dbus_message_print (message_data->message, 2);
1253       g_print ("%s", s);
1254       g_free (s);
1255       if (G_UNLIKELY (_g_dbus_debug_payload ()))
1256         {
1257           s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1258           g_print ("%s\n", s);
1259           g_free (s);
1260         }
1261       _g_dbus_debug_print_unlock ();
1262     }
1263 
1264   worker->write_num_messages_written += 1;
1265 }
1266 
1267 /* called in private thread shared by all GDBusConnection instances
1268  *
1269  * write-lock is held on entry
1270  * output_pending is PENDING_NONE on entry
1271  *
1272  * Returns: non-%NULL, setting @output_pending, if we need to flush now
1273  */
1274 static FlushAsyncData *
prepare_flush_unlocked(GDBusWorker * worker)1275 prepare_flush_unlocked (GDBusWorker *worker)
1276 {
1277   GList *l;
1278   GList *ll;
1279   GList *flushers;
1280 
1281   flushers = NULL;
1282   for (l = worker->write_pending_flushes; l != NULL; l = ll)
1283     {
1284       FlushData *f = l->data;
1285       ll = l->next;
1286 
1287       if (f->number_to_wait_for == worker->write_num_messages_written)
1288         {
1289           flushers = g_list_append (flushers, f);
1290           worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1291         }
1292     }
1293   if (flushers != NULL)
1294     {
1295       g_assert (worker->output_pending == PENDING_NONE);
1296       worker->output_pending = PENDING_FLUSH;
1297     }
1298 
1299   if (flushers != NULL)
1300     {
1301       FlushAsyncData *data;
1302 
1303       data = g_new0 (FlushAsyncData, 1);
1304       data->worker = _g_dbus_worker_ref (worker);
1305       data->flushers = flushers;
1306       return data;
1307     }
1308 
1309   return NULL;
1310 }
1311 
1312 /* called in private thread shared by all GDBusConnection instances
1313  *
1314  * write-lock is not held on entry
1315  * output_pending is PENDING_WRITE on entry
1316  */
1317 static void
write_message_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1318 write_message_cb (GObject       *source_object,
1319                   GAsyncResult  *res,
1320                   gpointer       user_data)
1321 {
1322   MessageToWriteData *data = user_data;
1323   GError *error;
1324 
1325   g_mutex_lock (&data->worker->write_lock);
1326   g_assert (data->worker->output_pending == PENDING_WRITE);
1327   data->worker->output_pending = PENDING_NONE;
1328 
1329   error = NULL;
1330   if (!write_message_finish (res, &error))
1331     {
1332       g_mutex_unlock (&data->worker->write_lock);
1333 
1334       /* TODO: handle */
1335       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1336       g_error_free (error);
1337 
1338       g_mutex_lock (&data->worker->write_lock);
1339     }
1340 
1341   message_written_unlocked (data->worker, data);
1342 
1343   g_mutex_unlock (&data->worker->write_lock);
1344 
1345   continue_writing (data->worker);
1346 
1347   message_to_write_data_free (data);
1348 }
1349 
1350 /* called in private thread shared by all GDBusConnection instances
1351  *
1352  * write-lock is not held on entry
1353  * output_pending is PENDING_CLOSE on entry
1354  */
1355 static void
iostream_close_cb(GObject * source_object,GAsyncResult * res,gpointer user_data)1356 iostream_close_cb (GObject      *source_object,
1357                    GAsyncResult *res,
1358                    gpointer      user_data)
1359 {
1360   GDBusWorker *worker = user_data;
1361   GError *error = NULL;
1362   GList *pending_close_attempts, *pending_flush_attempts;
1363   GQueue *send_queue;
1364 
1365   g_io_stream_close_finish (worker->stream, res, &error);
1366 
1367   g_mutex_lock (&worker->write_lock);
1368 
1369   pending_close_attempts = worker->pending_close_attempts;
1370   worker->pending_close_attempts = NULL;
1371 
1372   pending_flush_attempts = worker->write_pending_flushes;
1373   worker->write_pending_flushes = NULL;
1374 
1375   send_queue = worker->write_queue;
1376   worker->write_queue = g_queue_new ();
1377 
1378   g_assert (worker->output_pending == PENDING_CLOSE);
1379   worker->output_pending = PENDING_NONE;
1380 
1381   /* Ensure threads waiting for pending flushes to finish will be unblocked. */
1382   worker->write_num_messages_flushed =
1383     worker->write_num_messages_written + g_list_length(pending_flush_attempts);
1384 
1385   g_mutex_unlock (&worker->write_lock);
1386 
1387   while (pending_close_attempts != NULL)
1388     {
1389       CloseData *close_data = pending_close_attempts->data;
1390 
1391       pending_close_attempts = g_list_delete_link (pending_close_attempts,
1392                                                    pending_close_attempts);
1393 
1394       if (close_data->task != NULL)
1395         {
1396           if (error != NULL)
1397             g_task_return_error (close_data->task, g_error_copy (error));
1398           else
1399             g_task_return_boolean (close_data->task, TRUE);
1400         }
1401 
1402       close_data_free (close_data);
1403     }
1404 
1405   g_clear_error (&error);
1406 
1407   /* all messages queued for sending are discarded */
1408   g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1409   /* all queued flushes fail */
1410   error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1411                        _("Operation was cancelled"));
1412   flush_data_list_complete (pending_flush_attempts, error);
1413   g_list_free (pending_flush_attempts);
1414   g_clear_error (&error);
1415 
1416   _g_dbus_worker_unref (worker);
1417 }
1418 
1419 /* called in private thread shared by all GDBusConnection instances
1420  *
1421  * write-lock is not held on entry
1422  * output_pending must be PENDING_NONE on entry
1423  */
1424 static void
continue_writing(GDBusWorker * worker)1425 continue_writing (GDBusWorker *worker)
1426 {
1427   MessageToWriteData *data;
1428   FlushAsyncData *flush_async_data;
1429 
1430  write_next:
1431   /* we mustn't try to write two things at once */
1432   g_assert (worker->output_pending == PENDING_NONE);
1433 
1434   g_mutex_lock (&worker->write_lock);
1435 
1436   data = NULL;
1437   flush_async_data = NULL;
1438 
1439   /* if we want to close the connection, that takes precedence */
1440   if (worker->pending_close_attempts != NULL)
1441     {
1442       GInputStream *input = g_io_stream_get_input_stream (worker->stream);
1443 
1444       if (!g_input_stream_has_pending (input))
1445         {
1446           worker->close_expected = TRUE;
1447           worker->output_pending = PENDING_CLOSE;
1448 
1449           g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1450                                    NULL, iostream_close_cb,
1451                                    _g_dbus_worker_ref (worker));
1452         }
1453     }
1454   else
1455     {
1456       flush_async_data = prepare_flush_unlocked (worker);
1457 
1458       if (flush_async_data == NULL)
1459         {
1460           data = g_queue_pop_head (worker->write_queue);
1461 
1462           if (data != NULL)
1463             worker->output_pending = PENDING_WRITE;
1464         }
1465     }
1466 
1467   g_mutex_unlock (&worker->write_lock);
1468 
1469   /* Note that write_lock is only used for protecting the @write_queue
1470    * and @output_pending fields of the GDBusWorker struct ... which we
1471    * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1472    *
1473    * Therefore, it's fine to drop it here when calling back into user
1474    * code and then writing the message out onto the GIOStream since this
1475    * function only runs on the worker thread.
1476    */
1477 
1478   if (flush_async_data != NULL)
1479     {
1480       start_flush (flush_async_data);
1481       g_assert (data == NULL);
1482     }
1483   else if (data != NULL)
1484     {
1485       GDBusMessage *old_message;
1486       guchar *new_blob;
1487       gsize new_blob_size;
1488       GError *error;
1489 
1490       old_message = data->message;
1491       data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1492       if (data->message == old_message)
1493         {
1494           /* filters had no effect - do nothing */
1495         }
1496       else if (data->message == NULL)
1497         {
1498           /* filters dropped message */
1499           g_mutex_lock (&worker->write_lock);
1500           worker->output_pending = PENDING_NONE;
1501           g_mutex_unlock (&worker->write_lock);
1502           message_to_write_data_free (data);
1503           goto write_next;
1504         }
1505       else
1506         {
1507           /* filters altered the message -> reencode */
1508           error = NULL;
1509           new_blob = g_dbus_message_to_blob (data->message,
1510                                              &new_blob_size,
1511                                              worker->capabilities,
1512                                              &error);
1513           if (new_blob == NULL)
1514             {
1515               /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1516                * the old message instead
1517                */
1518               g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1519                          g_dbus_message_get_serial (data->message),
1520                          error->message);
1521               g_error_free (error);
1522             }
1523           else
1524             {
1525               g_free (data->blob);
1526               data->blob = (gchar *) new_blob;
1527               data->blob_size = new_blob_size;
1528             }
1529         }
1530 
1531       write_message_async (worker,
1532                            data,
1533                            write_message_cb,
1534                            data);
1535     }
1536 }
1537 
1538 /* called in private thread shared by all GDBusConnection instances
1539  *
1540  * write-lock is not held on entry
1541  * output_pending may be anything
1542  */
1543 static gboolean
continue_writing_in_idle_cb(gpointer user_data)1544 continue_writing_in_idle_cb (gpointer user_data)
1545 {
1546   GDBusWorker *worker = user_data;
1547 
1548   /* Because this is the worker thread, we can read this struct member
1549    * without holding the lock: no other thread ever modifies it.
1550    */
1551   if (worker->output_pending == PENDING_NONE)
1552     continue_writing (worker);
1553 
1554   return FALSE;
1555 }
1556 
1557 /*
1558  * @write_data: (transfer full) (nullable):
1559  * @flush_data: (transfer full) (nullable):
1560  * @close_data: (transfer full) (nullable):
1561  *
1562  * Can be called from any thread
1563  *
1564  * write_lock is held on entry
1565  * output_pending may be anything
1566  */
1567 static void
schedule_writing_unlocked(GDBusWorker * worker,MessageToWriteData * write_data,FlushData * flush_data,CloseData * close_data)1568 schedule_writing_unlocked (GDBusWorker        *worker,
1569                            MessageToWriteData *write_data,
1570                            FlushData          *flush_data,
1571                            CloseData          *close_data)
1572 {
1573   if (write_data != NULL)
1574     g_queue_push_tail (worker->write_queue, write_data);
1575 
1576   if (flush_data != NULL)
1577     worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1578 
1579   if (close_data != NULL)
1580     worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1581                                                      close_data);
1582 
1583   /* If we had output pending, the next bit of output will happen
1584    * automatically when it finishes, so we only need to do this
1585    * if nothing was pending.
1586    *
1587    * The idle callback will re-check that output_pending is still
1588    * PENDING_NONE, to guard against output starting before the idle.
1589    */
1590   if (worker->output_pending == PENDING_NONE)
1591     {
1592       GSource *idle_source;
1593       idle_source = g_idle_source_new ();
1594       g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1595       g_source_set_callback (idle_source,
1596                              continue_writing_in_idle_cb,
1597                              _g_dbus_worker_ref (worker),
1598                              (GDestroyNotify) _g_dbus_worker_unref);
1599       g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1600       g_source_attach (idle_source, worker->shared_thread_data->context);
1601       g_source_unref (idle_source);
1602     }
1603 }
1604 
1605 static void
schedule_pending_close(GDBusWorker * worker)1606 schedule_pending_close (GDBusWorker *worker)
1607 {
1608   g_mutex_lock (&worker->write_lock);
1609   if (worker->pending_close_attempts)
1610     schedule_writing_unlocked (worker, NULL, NULL, NULL);
1611   g_mutex_unlock (&worker->write_lock);
1612 }
1613 
1614 /* ---------------------------------------------------------------------------------------------------- */
1615 
1616 /* can be called from any thread - steals blob
1617  *
1618  * write_lock is not held on entry
1619  * output_pending may be anything
1620  */
1621 void
_g_dbus_worker_send_message(GDBusWorker * worker,GDBusMessage * message,gchar * blob,gsize blob_len)1622 _g_dbus_worker_send_message (GDBusWorker    *worker,
1623                              GDBusMessage   *message,
1624                              gchar          *blob,
1625                              gsize           blob_len)
1626 {
1627   MessageToWriteData *data;
1628 
1629   g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1630   g_return_if_fail (blob != NULL);
1631   g_return_if_fail (blob_len > 16);
1632 
1633   data = g_slice_new0 (MessageToWriteData);
1634   data->worker = _g_dbus_worker_ref (worker);
1635   data->message = g_object_ref (message);
1636   data->blob = blob; /* steal! */
1637   data->blob_size = blob_len;
1638 
1639   g_mutex_lock (&worker->write_lock);
1640   schedule_writing_unlocked (worker, data, NULL, NULL);
1641   g_mutex_unlock (&worker->write_lock);
1642 }
1643 
1644 /* ---------------------------------------------------------------------------------------------------- */
1645 
1646 GDBusWorker *
_g_dbus_worker_new(GIOStream * stream,GDBusCapabilityFlags capabilities,gboolean initially_frozen,GDBusWorkerMessageReceivedCallback message_received_callback,GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,GDBusWorkerDisconnectedCallback disconnected_callback,gpointer user_data)1647 _g_dbus_worker_new (GIOStream                              *stream,
1648                     GDBusCapabilityFlags                    capabilities,
1649                     gboolean                                initially_frozen,
1650                     GDBusWorkerMessageReceivedCallback      message_received_callback,
1651                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1652                     GDBusWorkerDisconnectedCallback         disconnected_callback,
1653                     gpointer                                user_data)
1654 {
1655   GDBusWorker *worker;
1656   GSource *idle_source;
1657 
1658   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1659   g_return_val_if_fail (message_received_callback != NULL, NULL);
1660   g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1661   g_return_val_if_fail (disconnected_callback != NULL, NULL);
1662 
1663   worker = g_new0 (GDBusWorker, 1);
1664   worker->ref_count = 1;
1665 
1666   g_mutex_init (&worker->read_lock);
1667   worker->message_received_callback = message_received_callback;
1668   worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1669   worker->disconnected_callback = disconnected_callback;
1670   worker->user_data = user_data;
1671   worker->stream = g_object_ref (stream);
1672   worker->capabilities = capabilities;
1673   worker->cancellable = g_cancellable_new ();
1674   worker->output_pending = PENDING_NONE;
1675 
1676   worker->frozen = initially_frozen;
1677   worker->received_messages_while_frozen = g_queue_new ();
1678 
1679   g_mutex_init (&worker->write_lock);
1680   worker->write_queue = g_queue_new ();
1681 
1682   if (G_IS_SOCKET_CONNECTION (worker->stream))
1683     worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1684 
1685   worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1686 
1687   /* begin reading */
1688   idle_source = g_idle_source_new ();
1689   g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1690   g_source_set_callback (idle_source,
1691                          _g_dbus_worker_do_initial_read,
1692                          _g_dbus_worker_ref (worker),
1693                          (GDestroyNotify) _g_dbus_worker_unref);
1694   g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1695   g_source_attach (idle_source, worker->shared_thread_data->context);
1696   g_source_unref (idle_source);
1697 
1698   return worker;
1699 }
1700 
1701 /* ---------------------------------------------------------------------------------------------------- */
1702 
1703 /* can be called from any thread
1704  *
1705  * write_lock is not held on entry
1706  * output_pending may be anything
1707  */
1708 void
_g_dbus_worker_close(GDBusWorker * worker,GTask * task)1709 _g_dbus_worker_close (GDBusWorker         *worker,
1710                       GTask               *task)
1711 {
1712   CloseData *close_data;
1713 
1714   close_data = g_slice_new0 (CloseData);
1715   close_data->worker = _g_dbus_worker_ref (worker);
1716   close_data->task = (task == NULL ? NULL : g_object_ref (task));
1717 
1718   /* Don't set worker->close_expected here - we're in the wrong thread.
1719    * It'll be set before the actual close happens.
1720    */
1721   g_cancellable_cancel (worker->cancellable);
1722   g_mutex_lock (&worker->write_lock);
1723   schedule_writing_unlocked (worker, NULL, NULL, close_data);
1724   g_mutex_unlock (&worker->write_lock);
1725 }
1726 
1727 /* This can be called from any thread - frees worker. Note that
1728  * callbacks might still happen if called from another thread than the
1729  * worker - use your own synchronization primitive in the callbacks.
1730  *
1731  * write_lock is not held on entry
1732  * output_pending may be anything
1733  */
1734 void
_g_dbus_worker_stop(GDBusWorker * worker)1735 _g_dbus_worker_stop (GDBusWorker *worker)
1736 {
1737   g_atomic_int_set (&worker->stopped, TRUE);
1738 
1739   /* Cancel any pending operations and schedule a close of the underlying I/O
1740    * stream in the worker thread
1741    */
1742   _g_dbus_worker_close (worker, NULL);
1743 
1744   /* _g_dbus_worker_close holds a ref until after an idle in the worker
1745    * thread has run, so we no longer need to unref in an idle like in
1746    * commit 322e25b535
1747    */
1748   _g_dbus_worker_unref (worker);
1749 }
1750 
1751 /* ---------------------------------------------------------------------------------------------------- */
1752 
1753 /* can be called from any thread (except the worker thread) - blocks
1754  * calling thread until all queued outgoing messages are written and
1755  * the transport has been flushed
1756  *
1757  * write_lock is not held on entry
1758  * output_pending may be anything
1759  */
1760 gboolean
_g_dbus_worker_flush_sync(GDBusWorker * worker,GCancellable * cancellable,GError ** error)1761 _g_dbus_worker_flush_sync (GDBusWorker    *worker,
1762                            GCancellable   *cancellable,
1763                            GError        **error)
1764 {
1765   gboolean ret;
1766   FlushData *data;
1767   guint64 pending_writes;
1768 
1769   data = NULL;
1770   ret = TRUE;
1771 
1772   g_mutex_lock (&worker->write_lock);
1773 
1774   /* if the queue is empty, no write is in-flight and we haven't written
1775    * anything since the last flush, then there's nothing to wait for
1776    */
1777   pending_writes = g_queue_get_length (worker->write_queue);
1778 
1779   /* if a write is in-flight, we shouldn't be satisfied until the first
1780    * flush operation that follows it
1781    */
1782   if (worker->output_pending == PENDING_WRITE)
1783     pending_writes += 1;
1784 
1785   if (pending_writes > 0 ||
1786       worker->write_num_messages_written != worker->write_num_messages_flushed)
1787     {
1788       data = g_new0 (FlushData, 1);
1789       g_mutex_init (&data->mutex);
1790       g_cond_init (&data->cond);
1791       data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1792       data->finished = FALSE;
1793       g_mutex_lock (&data->mutex);
1794 
1795       schedule_writing_unlocked (worker, NULL, data, NULL);
1796     }
1797   g_mutex_unlock (&worker->write_lock);
1798 
1799   if (data != NULL)
1800     {
1801       /* Wait for flush operations to finish. */
1802       while (!data->finished)
1803         {
1804           g_cond_wait (&data->cond, &data->mutex);
1805         }
1806 
1807       g_mutex_unlock (&data->mutex);
1808       g_cond_clear (&data->cond);
1809       g_mutex_clear (&data->mutex);
1810       if (data->error != NULL)
1811         {
1812           ret = FALSE;
1813           g_propagate_error (error, data->error);
1814         }
1815       g_free (data);
1816     }
1817 
1818   return ret;
1819 }
1820 
1821 /* ---------------------------------------------------------------------------------------------------- */
1822 
1823 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1824 #define G_DBUS_DEBUG_TRANSPORT      (1<<1)
1825 #define G_DBUS_DEBUG_MESSAGE        (1<<2)
1826 #define G_DBUS_DEBUG_PAYLOAD        (1<<3)
1827 #define G_DBUS_DEBUG_CALL           (1<<4)
1828 #define G_DBUS_DEBUG_SIGNAL         (1<<5)
1829 #define G_DBUS_DEBUG_INCOMING       (1<<6)
1830 #define G_DBUS_DEBUG_RETURN         (1<<7)
1831 #define G_DBUS_DEBUG_EMISSION       (1<<8)
1832 #define G_DBUS_DEBUG_ADDRESS        (1<<9)
1833 #define G_DBUS_DEBUG_PROXY          (1<<10)
1834 
1835 static gint _gdbus_debug_flags = 0;
1836 
1837 gboolean
_g_dbus_debug_authentication(void)1838 _g_dbus_debug_authentication (void)
1839 {
1840   _g_dbus_initialize ();
1841   return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1842 }
1843 
1844 gboolean
_g_dbus_debug_transport(void)1845 _g_dbus_debug_transport (void)
1846 {
1847   _g_dbus_initialize ();
1848   return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1849 }
1850 
1851 gboolean
_g_dbus_debug_message(void)1852 _g_dbus_debug_message (void)
1853 {
1854   _g_dbus_initialize ();
1855   return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1856 }
1857 
1858 gboolean
_g_dbus_debug_payload(void)1859 _g_dbus_debug_payload (void)
1860 {
1861   _g_dbus_initialize ();
1862   return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1863 }
1864 
1865 gboolean
_g_dbus_debug_call(void)1866 _g_dbus_debug_call (void)
1867 {
1868   _g_dbus_initialize ();
1869   return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1870 }
1871 
1872 gboolean
_g_dbus_debug_signal(void)1873 _g_dbus_debug_signal (void)
1874 {
1875   _g_dbus_initialize ();
1876   return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1877 }
1878 
1879 gboolean
_g_dbus_debug_incoming(void)1880 _g_dbus_debug_incoming (void)
1881 {
1882   _g_dbus_initialize ();
1883   return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1884 }
1885 
1886 gboolean
_g_dbus_debug_return(void)1887 _g_dbus_debug_return (void)
1888 {
1889   _g_dbus_initialize ();
1890   return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1891 }
1892 
1893 gboolean
_g_dbus_debug_emission(void)1894 _g_dbus_debug_emission (void)
1895 {
1896   _g_dbus_initialize ();
1897   return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1898 }
1899 
1900 gboolean
_g_dbus_debug_address(void)1901 _g_dbus_debug_address (void)
1902 {
1903   _g_dbus_initialize ();
1904   return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1905 }
1906 
1907 gboolean
_g_dbus_debug_proxy(void)1908 _g_dbus_debug_proxy (void)
1909 {
1910   _g_dbus_initialize ();
1911   return (_gdbus_debug_flags & G_DBUS_DEBUG_PROXY) != 0;
1912 }
1913 
1914 G_LOCK_DEFINE_STATIC (print_lock);
1915 
1916 void
_g_dbus_debug_print_lock(void)1917 _g_dbus_debug_print_lock (void)
1918 {
1919   G_LOCK (print_lock);
1920 }
1921 
1922 void
_g_dbus_debug_print_unlock(void)1923 _g_dbus_debug_print_unlock (void)
1924 {
1925   G_UNLOCK (print_lock);
1926 }
1927 
1928 /**
1929  * _g_dbus_initialize:
1930  *
1931  * Does various one-time init things such as
1932  *
1933  *  - registering the G_DBUS_ERROR error domain
1934  *  - parses the G_DBUS_DEBUG environment variable
1935  */
1936 void
_g_dbus_initialize(void)1937 _g_dbus_initialize (void)
1938 {
1939   static volatile gsize initialized = 0;
1940 
1941   if (g_once_init_enter (&initialized))
1942     {
1943       volatile GQuark g_dbus_error_domain;
1944       const gchar *debug;
1945 
1946       g_dbus_error_domain = G_DBUS_ERROR;
1947       (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1948 
1949       debug = g_getenv ("G_DBUS_DEBUG");
1950       if (debug != NULL)
1951         {
1952           const GDebugKey keys[] = {
1953             { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1954             { "transport",      G_DBUS_DEBUG_TRANSPORT      },
1955             { "message",        G_DBUS_DEBUG_MESSAGE        },
1956             { "payload",        G_DBUS_DEBUG_PAYLOAD        },
1957             { "call",           G_DBUS_DEBUG_CALL           },
1958             { "signal",         G_DBUS_DEBUG_SIGNAL         },
1959             { "incoming",       G_DBUS_DEBUG_INCOMING       },
1960             { "return",         G_DBUS_DEBUG_RETURN         },
1961             { "emission",       G_DBUS_DEBUG_EMISSION       },
1962             { "address",        G_DBUS_DEBUG_ADDRESS        },
1963             { "proxy",          G_DBUS_DEBUG_PROXY          }
1964           };
1965 
1966           _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1967           if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1968             _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1969         }
1970 
1971       /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
1972       ensure_required_types ();
1973 
1974       g_once_init_leave (&initialized, 1);
1975     }
1976 }
1977 
1978 /* ---------------------------------------------------------------------------------------------------- */
1979 
1980 GVariantType *
_g_dbus_compute_complete_signature(GDBusArgInfo ** args)1981 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1982 {
1983   const GVariantType *arg_types[256];
1984   guint n;
1985 
1986   if (args)
1987     for (n = 0; args[n] != NULL; n++)
1988       {
1989         /* DBus places a hard limit of 255 on signature length.
1990          * therefore number of args must be less than 256.
1991          */
1992         g_assert (n < 256);
1993 
1994         arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1995 
1996         if G_UNLIKELY (arg_types[n] == NULL)
1997           return NULL;
1998       }
1999   else
2000     n = 0;
2001 
2002   return g_variant_type_new_tuple (arg_types, n);
2003 }
2004 
2005 /* ---------------------------------------------------------------------------------------------------- */
2006 
2007 #ifdef G_OS_WIN32
2008 
2009 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
2010 
2011 gchar *
_g_dbus_win32_get_user_sid(void)2012 _g_dbus_win32_get_user_sid (void)
2013 {
2014   HANDLE h;
2015   TOKEN_USER *user;
2016   DWORD token_information_len;
2017   PSID psid;
2018   gchar *sid;
2019   gchar *ret;
2020 
2021   ret = NULL;
2022   user = NULL;
2023   h = INVALID_HANDLE_VALUE;
2024 
2025   if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
2026     {
2027       g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
2028       goto out;
2029     }
2030 
2031   /* Get length of buffer */
2032   token_information_len = 0;
2033   if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
2034     {
2035       if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
2036         {
2037           g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2038           goto out;
2039         }
2040     }
2041   user = g_malloc (token_information_len);
2042   if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
2043     {
2044       g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
2045       goto out;
2046     }
2047 
2048   psid = user->User.Sid;
2049   if (!IsValidSid (psid))
2050     {
2051       g_warning ("Invalid SID");
2052       goto out;
2053     }
2054 
2055   if (!ConvertSidToStringSidA (psid, &sid))
2056     {
2057       g_warning ("Invalid SID");
2058       goto out;
2059     }
2060 
2061   ret = g_strdup (sid);
2062   LocalFree (sid);
2063 
2064 out:
2065   g_free (user);
2066   if (h != INVALID_HANDLE_VALUE)
2067     CloseHandle (h);
2068   return ret;
2069 }
2070 
2071 
2072 #define DBUS_DAEMON_ADDRESS_INFO "DBusDaemonAddressInfo"
2073 #define DBUS_DAEMON_MUTEX "DBusDaemonMutex"
2074 #define UNIQUE_DBUS_INIT_MUTEX "UniqueDBusInitMutex"
2075 #define DBUS_AUTOLAUNCH_MUTEX "DBusAutolaunchMutex"
2076 
2077 static void
release_mutex(HANDLE mutex)2078 release_mutex (HANDLE mutex)
2079 {
2080   ReleaseMutex (mutex);
2081   CloseHandle (mutex);
2082 }
2083 
2084 static HANDLE
acquire_mutex(const char * mutexname)2085 acquire_mutex (const char *mutexname)
2086 {
2087   HANDLE mutex;
2088   DWORD res;
2089 
2090   mutex = CreateMutexA (NULL, FALSE, mutexname);
2091   if (!mutex)
2092     return 0;
2093 
2094   res = WaitForSingleObject (mutex, INFINITE);
2095   switch (res)
2096     {
2097     case WAIT_ABANDONED:
2098       release_mutex (mutex);
2099       return 0;
2100     case WAIT_FAILED:
2101     case WAIT_TIMEOUT:
2102       return 0;
2103     }
2104 
2105   return mutex;
2106 }
2107 
2108 static gboolean
is_mutex_owned(const char * mutexname)2109 is_mutex_owned (const char *mutexname)
2110 {
2111   HANDLE mutex;
2112   gboolean res = FALSE;
2113 
2114   mutex = CreateMutexA (NULL, FALSE, mutexname);
2115   if (WaitForSingleObject (mutex, 10) == WAIT_TIMEOUT)
2116     res = TRUE;
2117   else
2118     ReleaseMutex (mutex);
2119   CloseHandle (mutex);
2120 
2121   return res;
2122 }
2123 
2124 static char *
read_shm(const char * shm_name)2125 read_shm (const char *shm_name)
2126 {
2127   HANDLE shared_mem;
2128   char *shared_data;
2129   char *res;
2130   int i;
2131 
2132   res = NULL;
2133 
2134   for (i = 0; i < 20; i++)
2135     {
2136       shared_mem = OpenFileMappingA (FILE_MAP_READ, FALSE, shm_name);
2137       if (shared_mem != 0)
2138 	break;
2139       Sleep (100);
2140     }
2141 
2142   if (shared_mem != 0)
2143     {
2144       shared_data = MapViewOfFile (shared_mem, FILE_MAP_READ, 0, 0, 0);
2145       /* It looks that a race is possible here:
2146        * if the dbus process already created mapping but didn't fill it
2147        * the code below may read incorrect address.
2148        * Also this is a bit complicated by the fact that
2149        * any change in the "synchronization contract" between processes
2150        * should be accompanied with renaming all of used win32 named objects:
2151        * otherwise libgio-2.0-0.dll of different versions shipped with
2152        * different apps may break each other due to protocol difference.
2153        */
2154       if (shared_data != NULL)
2155 	{
2156 	  res = g_strdup (shared_data);
2157 	  UnmapViewOfFile (shared_data);
2158 	}
2159       CloseHandle (shared_mem);
2160     }
2161 
2162   return res;
2163 }
2164 
2165 static HANDLE
set_shm(const char * shm_name,const char * value)2166 set_shm (const char *shm_name, const char *value)
2167 {
2168   HANDLE shared_mem;
2169   char *shared_data;
2170 
2171   shared_mem = CreateFileMappingA (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
2172 				   0, strlen (value) + 1, shm_name);
2173   if (shared_mem == 0)
2174     return 0;
2175 
2176   shared_data = MapViewOfFile (shared_mem, FILE_MAP_WRITE, 0, 0, 0 );
2177   if (shared_data == NULL)
2178     return 0;
2179 
2180   strcpy (shared_data, value);
2181 
2182   UnmapViewOfFile (shared_data);
2183 
2184   return shared_mem;
2185 }
2186 
2187 /* These keep state between publish_session_bus and unpublish_session_bus */
2188 static HANDLE published_daemon_mutex;
2189 static HANDLE published_shared_mem;
2190 
2191 static gboolean
publish_session_bus(const char * address)2192 publish_session_bus (const char *address)
2193 {
2194   HANDLE init_mutex;
2195 
2196   init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2197 
2198   published_daemon_mutex = CreateMutexA (NULL, FALSE, DBUS_DAEMON_MUTEX);
2199   if (WaitForSingleObject (published_daemon_mutex, 10 ) != WAIT_OBJECT_0)
2200     {
2201       release_mutex (init_mutex);
2202       CloseHandle (published_daemon_mutex);
2203       published_daemon_mutex = NULL;
2204       return FALSE;
2205     }
2206 
2207   published_shared_mem = set_shm (DBUS_DAEMON_ADDRESS_INFO, address);
2208   if (!published_shared_mem)
2209     {
2210       release_mutex (init_mutex);
2211       CloseHandle (published_daemon_mutex);
2212       published_daemon_mutex = NULL;
2213       return FALSE;
2214     }
2215 
2216   release_mutex (init_mutex);
2217   return TRUE;
2218 }
2219 
2220 static void
unpublish_session_bus(void)2221 unpublish_session_bus (void)
2222 {
2223   HANDLE init_mutex;
2224 
2225   init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2226 
2227   CloseHandle (published_shared_mem);
2228   published_shared_mem = NULL;
2229 
2230   release_mutex (published_daemon_mutex);
2231   published_daemon_mutex = NULL;
2232 
2233   release_mutex (init_mutex);
2234 }
2235 
2236 static void
wait_console_window(void)2237 wait_console_window (void)
2238 {
2239   FILE *console = fopen ("CONOUT$", "w");
2240 
2241   SetConsoleTitleW (L"gdbus-daemon output. Type any character to close this window.");
2242   fprintf (console, _("(Type any character to close this window)\n"));
2243   fflush (console);
2244   _getch ();
2245 }
2246 
2247 static void
open_console_window(void)2248 open_console_window (void)
2249 {
2250   if (((HANDLE) _get_osfhandle (fileno (stdout)) == INVALID_HANDLE_VALUE ||
2251        (HANDLE) _get_osfhandle (fileno (stderr)) == INVALID_HANDLE_VALUE) && AllocConsole ())
2252     {
2253       if ((HANDLE) _get_osfhandle (fileno (stdout)) == INVALID_HANDLE_VALUE)
2254         freopen ("CONOUT$", "w", stdout);
2255 
2256       if ((HANDLE) _get_osfhandle (fileno (stderr)) == INVALID_HANDLE_VALUE)
2257         freopen ("CONOUT$", "w", stderr);
2258 
2259       SetConsoleTitleW (L"gdbus-daemon debug output.");
2260 
2261       atexit (wait_console_window);
2262     }
2263 }
2264 
2265 static void
idle_timeout_cb(GDBusDaemon * daemon,gpointer user_data)2266 idle_timeout_cb (GDBusDaemon *daemon, gpointer user_data)
2267 {
2268   GMainLoop *loop = user_data;
2269   g_main_loop_quit (loop);
2270 }
2271 
2272 /* Satisfies STARTF_FORCEONFEEDBACK */
2273 static void
turn_off_the_starting_cursor(void)2274 turn_off_the_starting_cursor (void)
2275 {
2276   MSG msg;
2277   BOOL bRet;
2278 
2279   PostQuitMessage (0);
2280 
2281   while ((bRet = GetMessage (&msg, 0, 0, 0)) != 0)
2282     {
2283       if (bRet == -1)
2284         continue;
2285 
2286       TranslateMessage (&msg);
2287       DispatchMessage (&msg);
2288     }
2289 }
2290 
2291 __declspec(dllexport) void __stdcall
g_win32_run_session_bus(void * hwnd,void * hinst,const char * cmdline,int cmdshow)2292 g_win32_run_session_bus (void* hwnd, void* hinst, const char* cmdline, int cmdshow)
2293 {
2294   GDBusDaemon *daemon;
2295   GMainLoop *loop;
2296   const char *address;
2297   GError *error = NULL;
2298 
2299   turn_off_the_starting_cursor ();
2300 
2301   if (g_getenv ("GDBUS_DAEMON_DEBUG") != NULL)
2302     open_console_window ();
2303 
2304   address = "nonce-tcp:";
2305   daemon = _g_dbus_daemon_new (address, NULL, &error);
2306   if (daemon == NULL)
2307     {
2308       g_printerr ("Can't init bus: %s\n", error->message);
2309       g_error_free (error);
2310       return;
2311     }
2312 
2313   loop = g_main_loop_new (NULL, FALSE);
2314 
2315   /* There is a subtle detail with "idle-timeout" signal of dbus daemon:
2316    * It is fired on idle after last client disconnection,
2317    * but (at least with glib 2.59.1) it is NEVER fired
2318    * if no clients connect to daemon at all.
2319    * This may lead to infinite run of this daemon process.
2320    */
2321   g_signal_connect (daemon, "idle-timeout", G_CALLBACK (idle_timeout_cb), loop);
2322 
2323   if (publish_session_bus (_g_dbus_daemon_get_address (daemon)))
2324     {
2325       g_main_loop_run (loop);
2326 
2327       unpublish_session_bus ();
2328     }
2329 
2330   g_main_loop_unref (loop);
2331   g_object_unref (daemon);
2332 }
2333 
2334 static gboolean autolaunch_binary_absent = FALSE;
2335 
2336 gchar *
_g_dbus_win32_get_session_address_dbus_launch(GError ** error)2337 _g_dbus_win32_get_session_address_dbus_launch (GError **error)
2338 {
2339   HANDLE autolaunch_mutex, init_mutex;
2340   char *address = NULL;
2341 
2342   autolaunch_mutex = acquire_mutex (DBUS_AUTOLAUNCH_MUTEX);
2343 
2344   init_mutex = acquire_mutex (UNIQUE_DBUS_INIT_MUTEX);
2345 
2346   if (is_mutex_owned (DBUS_DAEMON_MUTEX))
2347     address = read_shm (DBUS_DAEMON_ADDRESS_INFO);
2348 
2349   release_mutex (init_mutex);
2350 
2351   if (address == NULL && !autolaunch_binary_absent)
2352     {
2353       wchar_t gio_path[MAX_PATH + 2] = { 0 };
2354       int gio_path_len = GetModuleFileNameW (_g_io_win32_get_module (), gio_path, MAX_PATH + 1);
2355 
2356       /* The <= MAX_PATH check prevents truncated path usage */
2357       if (gio_path_len > 0 && gio_path_len <= MAX_PATH)
2358 	{
2359 	  PROCESS_INFORMATION pi = { 0 };
2360 	  STARTUPINFOW si = { 0 };
2361 	  BOOL res = FALSE;
2362 	  wchar_t exe_path[MAX_PATH + 100] = { 0 };
2363 	  /* calculate index of first char of dll file name inside full path */
2364 	  int gio_name_index = gio_path_len;
2365 	  for (; gio_name_index > 0; --gio_name_index)
2366 	  {
2367 	    wchar_t prev_char = gio_path[gio_name_index - 1];
2368 	    if (prev_char == L'\\' || prev_char == L'/')
2369 	      break;
2370 	  }
2371 	  gio_path[gio_name_index] = L'\0';
2372 	  wcscpy (exe_path, gio_path);
2373 	  wcscat (exe_path, L"\\gdbus.exe");
2374 
2375 	  if (GetFileAttributesW (exe_path) == INVALID_FILE_ATTRIBUTES)
2376 	    {
2377 	      /* warning won't be raised another time
2378 	       * since autolaunch_binary_absent would be already set.
2379 	       */
2380 	      autolaunch_binary_absent = TRUE;
2381 	      g_warning ("win32 session dbus binary not found: %S", exe_path );
2382 	    }
2383 	  else
2384 	    {
2385 	      wchar_t args[MAX_PATH*2 + 100] = { 0 };
2386 	      wcscpy (args, L"\"");
2387 	      wcscat (args, exe_path);
2388 	      wcscat (args, L"\" ");
2389 #define _L_PREFIX_FOR_EXPANDED(arg) L##arg
2390 #define _L_PREFIX(arg) _L_PREFIX_FOR_EXPANDED (arg)
2391 	      wcscat (args, _L_PREFIX (_GDBUS_ARG_WIN32_RUN_SESSION_BUS));
2392 #undef _L_PREFIX
2393 #undef _L_PREFIX_FOR_EXPANDED
2394 
2395 	      res = CreateProcessW (exe_path, args,
2396 				    0, 0, FALSE,
2397 				    NORMAL_PRIORITY_CLASS | CREATE_NO_WINDOW | DETACHED_PROCESS,
2398 				    0, gio_path,
2399 				    &si, &pi);
2400 	    }
2401 	  if (res)
2402 	    {
2403 	      address = read_shm (DBUS_DAEMON_ADDRESS_INFO);
2404 	      if (address == NULL)
2405 		g_warning ("%S dbus binary failed to launch bus, maybe incompatible version", exe_path );
2406 	    }
2407 	}
2408     }
2409 
2410   release_mutex (autolaunch_mutex);
2411 
2412   if (address == NULL)
2413     g_set_error (error,
2414 		 G_IO_ERROR,
2415 		 G_IO_ERROR_FAILED,
2416 		 _("Session dbus not running, and autolaunch failed"));
2417 
2418   return address;
2419 }
2420 
2421 #endif
2422 
2423 /* ---------------------------------------------------------------------------------------------------- */
2424 
2425 gchar *
_g_dbus_get_machine_id(GError ** error)2426 _g_dbus_get_machine_id (GError **error)
2427 {
2428 #ifdef G_OS_WIN32
2429   HW_PROFILE_INFOA info;
2430   char *src, *dest, *res;
2431   int i;
2432 
2433   if (!GetCurrentHwProfileA (&info))
2434     {
2435       char *message = g_win32_error_message (GetLastError ());
2436       g_set_error (error,
2437 		   G_IO_ERROR,
2438 		   G_IO_ERROR_FAILED,
2439 		   _("Unable to get Hardware profile: %s"), message);
2440       g_free (message);
2441       return NULL;
2442     }
2443 
2444   /* Form: {12340001-4980-1920-6788-123456789012} */
2445   src = &info.szHwProfileGuid[0];
2446 
2447   res = g_malloc (32+1);
2448   dest = res;
2449 
2450   src++; /* Skip { */
2451   for (i = 0; i < 8; i++)
2452     *dest++ = *src++;
2453   src++; /* Skip - */
2454   for (i = 0; i < 4; i++)
2455     *dest++ = *src++;
2456   src++; /* Skip - */
2457   for (i = 0; i < 4; i++)
2458     *dest++ = *src++;
2459   src++; /* Skip - */
2460   for (i = 0; i < 4; i++)
2461     *dest++ = *src++;
2462   src++; /* Skip - */
2463   for (i = 0; i < 12; i++)
2464     *dest++ = *src++;
2465   *dest = 0;
2466 
2467   return res;
2468 #else
2469   gchar *ret;
2470   GError *first_error;
2471   /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2472   ret = NULL;
2473   first_error = NULL;
2474   if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2475                             &ret,
2476                             NULL,
2477                             &first_error) &&
2478       !g_file_get_contents ("/etc/machine-id",
2479                             &ret,
2480                             NULL,
2481                             NULL))
2482     {
2483       g_propagate_prefixed_error (error, first_error,
2484                                   _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2485     }
2486   else
2487     {
2488       /* ignore the error from the first try, if any */
2489       g_clear_error (&first_error);
2490       /* TODO: validate value */
2491       g_strstrip (ret);
2492     }
2493   return ret;
2494 #endif
2495 }
2496 
2497 /* ---------------------------------------------------------------------------------------------------- */
2498 
2499 gchar *
_g_dbus_enum_to_string(GType enum_type,gint value)2500 _g_dbus_enum_to_string (GType enum_type, gint value)
2501 {
2502   gchar *ret;
2503   GEnumClass *klass;
2504   GEnumValue *enum_value;
2505 
2506   klass = g_type_class_ref (enum_type);
2507   enum_value = g_enum_get_value (klass, value);
2508   if (enum_value != NULL)
2509     ret = g_strdup (enum_value->value_nick);
2510   else
2511     ret = g_strdup_printf ("unknown (value %d)", value);
2512   g_type_class_unref (klass);
2513   return ret;
2514 }
2515 
2516 /* ---------------------------------------------------------------------------------------------------- */
2517 
2518 static void
write_message_print_transport_debug(gssize bytes_written,MessageToWriteData * data)2519 write_message_print_transport_debug (gssize bytes_written,
2520                                      MessageToWriteData *data)
2521 {
2522   if (G_LIKELY (!_g_dbus_debug_transport ()))
2523     goto out;
2524 
2525   _g_dbus_debug_print_lock ();
2526   g_print ("========================================================================\n"
2527            "GDBus-debug:Transport:\n"
2528            "  >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2529            "       size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2530            bytes_written,
2531            g_dbus_message_get_serial (data->message),
2532            data->blob_size,
2533            data->total_written,
2534            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2535   _g_dbus_debug_print_unlock ();
2536  out:
2537   ;
2538 }
2539 
2540 /* ---------------------------------------------------------------------------------------------------- */
2541 
2542 static void
read_message_print_transport_debug(gssize bytes_read,GDBusWorker * worker)2543 read_message_print_transport_debug (gssize bytes_read,
2544                                     GDBusWorker *worker)
2545 {
2546   gsize size;
2547   gint32 serial;
2548   gint32 message_length;
2549 
2550   if (G_LIKELY (!_g_dbus_debug_transport ()))
2551     goto out;
2552 
2553   size = bytes_read + worker->read_buffer_cur_size;
2554   serial = 0;
2555   message_length = 0;
2556   if (size >= 16)
2557     message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2558   if (size >= 1)
2559     {
2560       switch (worker->read_buffer[0])
2561         {
2562         case 'l':
2563           if (size >= 12)
2564             serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2565           break;
2566         case 'B':
2567           if (size >= 12)
2568             serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2569           break;
2570         default:
2571           /* an error will be set elsewhere if this happens */
2572           goto out;
2573         }
2574     }
2575 
2576     _g_dbus_debug_print_lock ();
2577   g_print ("========================================================================\n"
2578            "GDBus-debug:Transport:\n"
2579            "  <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2580            "       size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2581            bytes_read,
2582            serial,
2583            message_length,
2584            worker->read_buffer_cur_size,
2585            g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2586   _g_dbus_debug_print_unlock ();
2587  out:
2588   ;
2589 }
2590 
2591 /* ---------------------------------------------------------------------------------------------------- */
2592 
2593 gboolean
_g_signal_accumulator_false_handled(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)2594 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2595                                      GValue                *return_accu,
2596                                      const GValue          *handler_return,
2597                                      gpointer               dummy)
2598 {
2599   gboolean continue_emission;
2600   gboolean signal_return;
2601 
2602   signal_return = g_value_get_boolean (handler_return);
2603   g_value_set_boolean (return_accu, signal_return);
2604   continue_emission = signal_return;
2605 
2606   return continue_emission;
2607 }
2608 
2609 /* ---------------------------------------------------------------------------------------------------- */
2610 
2611 static void
append_nibble(GString * s,gint val)2612 append_nibble (GString *s, gint val)
2613 {
2614   g_string_append_c (s, val >= 10 ? ('a' + val - 10) : ('0' + val));
2615 }
2616 
2617 /* ---------------------------------------------------------------------------------------------------- */
2618 
2619 gchar *
_g_dbus_hexencode(const gchar * str,gsize str_len)2620 _g_dbus_hexencode (const gchar *str,
2621                    gsize        str_len)
2622 {
2623   gsize n;
2624   GString *s;
2625 
2626   s = g_string_new (NULL);
2627   for (n = 0; n < str_len; n++)
2628     {
2629       gint val;
2630       gint upper_nibble;
2631       gint lower_nibble;
2632 
2633       val = ((const guchar *) str)[n];
2634       upper_nibble = val >> 4;
2635       lower_nibble = val & 0x0f;
2636 
2637       append_nibble (s, upper_nibble);
2638       append_nibble (s, lower_nibble);
2639     }
2640 
2641   return g_string_free (s, FALSE);
2642 }
2643