nexmon – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /* GDBus - GLib D-Bus Library
2 *
3 * Copyright (C) 2008-2010 Red Hat, Inc.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
14 *
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17 *
18 * Author: David Zeuthen <davidz@redhat.com>
19 */
20  
21 #include "config.h"
22  
23 #include <stdlib.h>
24 #include <string.h>
25  
26 #include "giotypes.h"
27 #include "gsocket.h"
28 #include "gdbusprivate.h"
29 #include "gdbusmessage.h"
30 #include "gdbuserror.h"
31 #include "gdbusintrospection.h"
32 #include "gtask.h"
33 #include "ginputstream.h"
34 #include "gmemoryinputstream.h"
35 #include "giostream.h"
36 #include "glib/gstdio.h"
37 #include "gsocketcontrolmessage.h"
38 #include "gsocketconnection.h"
39 #include "gsocketoutputstream.h"
40  
41 #ifdef G_OS_UNIX
42 #include "gunixfdmessage.h"
43 #include "gunixconnection.h"
44 #include "gunixcredentialsmessage.h"
45 #endif
46  
47 #ifdef G_OS_WIN32
48 #include <windows.h>
49 #endif
50  
51 #include "glibintl.h"
52  
53 static gboolean _g_dbus_worker_do_initial_read (gpointer data);
54 static void schedule_pending_close (GDBusWorker *worker);
55  
56 /* ---------------------------------------------------------------------------------------------------- */
57  
58 gchar *
59 _g_dbus_hexdump (const gchar *data, gsize len, guint indent)
60 {
61 guint n, m;
62 GString *ret;
63  
64 ret = g_string_new (NULL);
65  
66 for (n = 0; n < len; n += 16)
67 {
68 g_string_append_printf (ret, "%*s%04x: ", indent, "", n);
69  
70 for (m = n; m < n + 16; m++)
71 {
72 if (m > n && (m%4) == 0)
73 g_string_append_c (ret, ' ');
74 if (m < len)
75 g_string_append_printf (ret, "%02x ", (guchar) data[m]);
76 else
77 g_string_append (ret, " ");
78 }
79  
80 g_string_append (ret, " ");
81  
82 for (m = n; m < len && m < n + 16; m++)
83 g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.');
84  
85 g_string_append_c (ret, '\n');
86 }
87  
88 return g_string_free (ret, FALSE);
89 }
90  
91 /* ---------------------------------------------------------------------------------------------------- */
92  
93 /* Unfortunately ancillary messages are discarded when reading from a
94 * socket using the GSocketInputStream abstraction. So we provide a
95 * very GInputStream-ish API that uses GSocket in this case (very
96 * similar to GSocketInputStream).
97 */
98  
99 typedef struct
100 {
101 void *buffer;
102 gsize count;
103  
104 GSocketControlMessage ***messages;
105 gint *num_messages;
106 } ReadWithControlData;
107  
108 static void
109 read_with_control_data_free (ReadWithControlData *data)
110 {
111 g_slice_free (ReadWithControlData, data);
112 }
113  
114 static gboolean
115 _g_socket_read_with_control_messages_ready (GSocket *socket,
116 GIOCondition condition,
117 gpointer user_data)
118 {
119 GTask *task = user_data;
120 ReadWithControlData *data = g_task_get_task_data (task);
121 GError *error;
122 gssize result;
123 GInputVector vector;
124  
125 error = NULL;
126 vector.buffer = data->buffer;
127 vector.size = data->count;
128 result = g_socket_receive_message (socket,
129 NULL, /* address */
130 &vector,
131 1,
132 data->messages,
133 data->num_messages,
134 NULL,
135 g_task_get_cancellable (task),
136 &error);
137  
138 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
139 {
140 g_error_free (error);
141 return TRUE;
142 }
143  
144 g_assert (result >= 0 || error != NULL);
145 if (result >= 0)
146 g_task_return_int (task, result);
147 else
148 g_task_return_error (task, error);
149 g_object_unref (task);
150  
151 return FALSE;
152 }
153  
154 static void
155 _g_socket_read_with_control_messages (GSocket *socket,
156 void *buffer,
157 gsize count,
158 GSocketControlMessage ***messages,
159 gint *num_messages,
160 gint io_priority,
161 GCancellable *cancellable,
162 GAsyncReadyCallback callback,
163 gpointer user_data)
164 {
165 GTask *task;
166 ReadWithControlData *data;
167 GSource *source;
168  
169 data = g_slice_new0 (ReadWithControlData);
170 data->buffer = buffer;
171 data->count = count;
172 data->messages = messages;
173 data->num_messages = num_messages;
174  
175 task = g_task_new (socket, cancellable, callback, user_data);
176 g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free);
177  
178 if (g_socket_condition_check (socket, G_IO_IN))
179 {
180 if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task))
181 return;
182 }
183  
184 source = g_socket_create_source (socket,
185 G_IO_IN | G_IO_HUP | G_IO_ERR,
186 cancellable);
187 g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready);
188 g_source_unref (source);
189 }
190  
191 static gssize
192 _g_socket_read_with_control_messages_finish (GSocket *socket,
193 GAsyncResult *result,
194 GError **error)
195 {
196 g_return_val_if_fail (G_IS_SOCKET (socket), -1);
197 g_return_val_if_fail (g_task_is_valid (result, socket), -1);
198  
199 return g_task_propagate_int (G_TASK (result), error);
200 }
201  
202 /* ---------------------------------------------------------------------------------------------------- */
203  
204 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
205  
206 static GPtrArray *ensured_classes = NULL;
207  
208 static void
209 ensure_type (GType gtype)
210 {
211 g_ptr_array_add (ensured_classes, g_type_class_ref (gtype));
212 }
213  
214 static void
215 release_required_types (void)
216 {
217 g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL);
218 g_ptr_array_unref (ensured_classes);
219 ensured_classes = NULL;
220 }
221  
222 static void
223 ensure_required_types (void)
224 {
225 g_assert (ensured_classes == NULL);
226 ensured_classes = g_ptr_array_new ();
227 ensure_type (G_TYPE_TASK);
228 ensure_type (G_TYPE_MEMORY_INPUT_STREAM);
229 }
230 /* ---------------------------------------------------------------------------------------------------- */
231  
232 typedef struct
233 {
234 volatile gint refcount;
235 GThread *thread;
236 GMainContext *context;
237 GMainLoop *loop;
238 } SharedThreadData;
239  
240 static gpointer
241 gdbus_shared_thread_func (gpointer user_data)
242 {
243 SharedThreadData *data = user_data;
244  
245 g_main_context_push_thread_default (data->context);
246 g_main_loop_run (data->loop);
247 g_main_context_pop_thread_default (data->context);
248  
249 release_required_types ();
250  
251 return NULL;
252 }
253  
254 /* ---------------------------------------------------------------------------------------------------- */
255  
256 static SharedThreadData *
257 _g_dbus_shared_thread_ref (void)
258 {
259 static gsize shared_thread_data = 0;
260 SharedThreadData *ret;
261  
262 if (g_once_init_enter (&shared_thread_data))
263 {
264 SharedThreadData *data;
265  
266 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
267 ensure_required_types ();
268  
269 data = g_new0 (SharedThreadData, 1);
270 data->refcount = 0;
271  
272 data->context = g_main_context_new ();
273 data->loop = g_main_loop_new (data->context, FALSE);
274 data->thread = g_thread_new ("gdbus",
275 gdbus_shared_thread_func,
276 data);
277 /* We can cast between gsize and gpointer safely */
278 g_once_init_leave (&shared_thread_data, (gsize) data);
279 }
280  
281 ret = (SharedThreadData*) shared_thread_data;
282 g_atomic_int_inc (&ret->refcount);
283 return ret;
284 }
285  
286 static void
287 _g_dbus_shared_thread_unref (SharedThreadData *data)
288 {
289 /* TODO: actually destroy the shared thread here */
290 #if 0
291 g_assert (data != NULL);
292 if (g_atomic_int_dec_and_test (&data->refcount))
293 {
294 g_main_loop_quit (data->loop);
295 //g_thread_join (data->thread);
296 g_main_loop_unref (data->loop);
297 g_main_context_unref (data->context);
298 }
299 #endif
300 }
301  
302 /* ---------------------------------------------------------------------------------------------------- */
303  
304 typedef enum {
305 PENDING_NONE = 0,
306 PENDING_WRITE,
307 PENDING_FLUSH,
308 PENDING_CLOSE
309 } OutputPending;
310  
311 struct GDBusWorker
312 {
313 volatile gint ref_count;
314  
315 SharedThreadData *shared_thread_data;
316  
317 /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
318 volatile gint stopped;
319  
320 /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
321 * only affects messages received from the other peer (since GDBusServer is the
322 * only user) - we might want it to affect messages sent to the other peer too?
323 */
324 gboolean frozen;
325 GDBusCapabilityFlags capabilities;
326 GQueue *received_messages_while_frozen;
327  
328 GIOStream *stream;
329 GCancellable *cancellable;
330 GDBusWorkerMessageReceivedCallback message_received_callback;
331 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
332 GDBusWorkerDisconnectedCallback disconnected_callback;
333 gpointer user_data;
334  
335 /* if not NULL, stream is GSocketConnection */
336 GSocket *socket;
337  
338 /* used for reading */
339 GMutex read_lock;
340 gchar *read_buffer;
341 gsize read_buffer_allocated_size;
342 gsize read_buffer_cur_size;
343 gsize read_buffer_bytes_wanted;
344 GUnixFDList *read_fd_list;
345 GSocketControlMessage **read_ancillary_messages;
346 gint read_num_ancillary_messages;
347  
348 /* Whether an async write, flush or close, or none of those, is pending.
349 * Only the worker thread may change its value, and only with the write_lock.
350 * Other threads may read its value when holding the write_lock.
351 * The worker thread may read its value at any time.
352 */
353 OutputPending output_pending;
354 /* used for writing */
355 GMutex write_lock;
356 /* queue of MessageToWriteData, protected by write_lock */
357 GQueue *write_queue;
358 /* protected by write_lock */
359 guint64 write_num_messages_written;
360 /* number of messages we'd written out last time we flushed;
361 * protected by write_lock
362 */
363 guint64 write_num_messages_flushed;
364 /* list of FlushData, protected by write_lock */
365 GList *write_pending_flushes;
366 /* list of CloseData, protected by write_lock */
367 GList *pending_close_attempts;
368 /* no lock - only used from the worker thread */
369 gboolean close_expected;
370 };
371  
372 static void _g_dbus_worker_unref (GDBusWorker *worker);
373  
374 /* ---------------------------------------------------------------------------------------------------- */
375  
376 typedef struct
377 {
378 GMutex mutex;
379 GCond cond;
380 guint64 number_to_wait_for;
381 GError *error;
382 } FlushData;
383  
384 struct _MessageToWriteData ;
385 typedef struct _MessageToWriteData MessageToWriteData;
386  
387 static void message_to_write_data_free (MessageToWriteData *data);
388  
389 static void read_message_print_transport_debug (gssize bytes_read,
390 GDBusWorker *worker);
391  
392 static void write_message_print_transport_debug (gssize bytes_written,
393 MessageToWriteData *data);
394  
395 typedef struct {
396 GDBusWorker *worker;
397 GTask *task;
398 } CloseData;
399  
400 static void close_data_free (CloseData *close_data)
401 {
402 g_clear_object (&close_data->task);
403  
404 _g_dbus_worker_unref (close_data->worker);
405 g_slice_free (CloseData, close_data);
406 }
407  
408 /* ---------------------------------------------------------------------------------------------------- */
409  
410 static GDBusWorker *
411 _g_dbus_worker_ref (GDBusWorker *worker)
412 {
413 g_atomic_int_inc (&worker->ref_count);
414 return worker;
415 }
416  
417 static void
418 _g_dbus_worker_unref (GDBusWorker *worker)
419 {
420 if (g_atomic_int_dec_and_test (&worker->ref_count))
421 {
422 g_assert (worker->write_pending_flushes == NULL);
423  
424 _g_dbus_shared_thread_unref (worker->shared_thread_data);
425  
426 g_object_unref (worker->stream);
427  
428 g_mutex_clear (&worker->read_lock);
429 g_object_unref (worker->cancellable);
430 if (worker->read_fd_list != NULL)
431 g_object_unref (worker->read_fd_list);
432  
433 g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref);
434 g_mutex_clear (&worker->write_lock);
435 g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free);
436 g_free (worker->read_buffer);
437  
438 g_free (worker);
439 }
440 }
441  
442 static void
443 _g_dbus_worker_emit_disconnected (GDBusWorker *worker,
444 gboolean remote_peer_vanished,
445 GError *error)
446 {
447 if (!g_atomic_int_get (&worker->stopped))
448 worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data);
449 }
450  
451 static void
452 _g_dbus_worker_emit_message_received (GDBusWorker *worker,
453 GDBusMessage *message)
454 {
455 if (!g_atomic_int_get (&worker->stopped))
456 worker->message_received_callback (worker, message, worker->user_data);
457 }
458  
459 static GDBusMessage *
460 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
461 GDBusMessage *message)
462 {
463 GDBusMessage *ret;
464 if (!g_atomic_int_get (&worker->stopped))
465 ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
466 else
467 ret = message;
468 return ret;
469 }
470  
471 /* can only be called from private thread with read-lock held - takes ownership of @message */
472 static void
473 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
474 GDBusMessage *message)
475 {
476 if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
477 {
478 /* queue up */
479 g_queue_push_tail (worker->received_messages_while_frozen, message);
480 }
481 else
482 {
483 /* not frozen, nor anything in queue */
484 _g_dbus_worker_emit_message_received (worker, message);
485 g_object_unref (message);
486 }
487 }
488  
489 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
490 static gboolean
491 unfreeze_in_idle_cb (gpointer user_data)
492 {
493 GDBusWorker *worker = user_data;
494 GDBusMessage *message;
495  
496 g_mutex_lock (&worker->read_lock);
497 if (worker->frozen)
498 {
499 while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
500 {
501 _g_dbus_worker_emit_message_received (worker, message);
502 g_object_unref (message);
503 }
504 worker->frozen = FALSE;
505 }
506 else
507 {
508 g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
509 }
510 g_mutex_unlock (&worker->read_lock);
511 return FALSE;
512 }
513  
514 /* can be called from any thread */
515 void
516 _g_dbus_worker_unfreeze (GDBusWorker *worker)
517 {
518 GSource *idle_source;
519 idle_source = g_idle_source_new ();
520 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
521 g_source_set_callback (idle_source,
522 unfreeze_in_idle_cb,
523 _g_dbus_worker_ref (worker),
524 (GDestroyNotify) _g_dbus_worker_unref);
525 g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb");
526 g_source_attach (idle_source, worker->shared_thread_data->context);
527 g_source_unref (idle_source);
528 }
529  
530 /* ---------------------------------------------------------------------------------------------------- */
531  
532 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
533  
534 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
535 static void
536 _g_dbus_worker_do_read_cb (GInputStream *input_stream,
537 GAsyncResult *res,
538 gpointer user_data)
539 {
540 GDBusWorker *worker = user_data;
541 GError *error;
542 gssize bytes_read;
543  
544 g_mutex_lock (&worker->read_lock);
545  
546 /* If already stopped, don't even process the reply */
547 if (g_atomic_int_get (&worker->stopped))
548 goto out;
549  
550 error = NULL;
551 if (worker->socket == NULL)
552 bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream),
553 res,
554 &error);
555 else
556 bytes_read = _g_socket_read_with_control_messages_finish (worker->socket,
557 res,
558 &error);
559 if (worker->read_num_ancillary_messages > 0)
560 {
561 gint n;
562 for (n = 0; n < worker->read_num_ancillary_messages; n++)
563 {
564 GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]);
565  
566 if (FALSE)
567 {
568 }
569 #ifdef G_OS_UNIX
570 else if (G_IS_UNIX_FD_MESSAGE (control_message))
571 {
572 GUnixFDMessage *fd_message;
573 gint *fds;
574 gint num_fds;
575  
576 fd_message = G_UNIX_FD_MESSAGE (control_message);
577 fds = g_unix_fd_message_steal_fds (fd_message, &num_fds);
578 if (worker->read_fd_list == NULL)
579 {
580 worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds);
581 }
582 else
583 {
584 gint n;
585 for (n = 0; n < num_fds; n++)
586 {
587 /* TODO: really want a append_steal() */
588 g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL);
589 (void) g_close (fds[n], NULL);
590 }
591 }
592 g_free (fds);
593 }
594 else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message))
595 {
596 /* do nothing */
597 }
598 #endif
599 else
600 {
601 if (error == NULL)
602 {
603 g_set_error (&error,
604 G_IO_ERROR,
605 G_IO_ERROR_FAILED,
606 "Unexpected ancillary message of type %s received from peer",
607 g_type_name (G_TYPE_FROM_INSTANCE (control_message)));
608 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
609 g_error_free (error);
610 g_object_unref (control_message);
611 n++;
612 while (n < worker->read_num_ancillary_messages)
613 g_object_unref (worker->read_ancillary_messages[n++]);
614 g_free (worker->read_ancillary_messages);
615 goto out;
616 }
617 }
618 g_object_unref (control_message);
619 }
620 g_free (worker->read_ancillary_messages);
621 }
622  
623 if (bytes_read == -1)
624 {
625 if (G_UNLIKELY (_g_dbus_debug_transport ()))
626 {
627 _g_dbus_debug_print_lock ();
628 g_print ("========================================================================\n"
629 "GDBus-debug:Transport:\n"
630 " ---- READ ERROR on stream of type %s:\n"
631 " ---- %s %d: %s\n",
632 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))),
633 g_quark_to_string (error->domain), error->code,
634 error->message);
635 _g_dbus_debug_print_unlock ();
636 }
637  
638 /* Every async read that uses this callback uses worker->cancellable
639 * as its GCancellable. worker->cancellable gets cancelled if and only
640 * if the GDBusConnection tells us to close (either via
641 * _g_dbus_worker_stop, which is called on last-unref, or directly),
642 * so a cancelled read must mean our connection was closed locally.
643 *
644 * If we're closing, other errors are possible - notably,
645 * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
646 * read in-flight. It seems sensible to treat all read errors during
647 * closing as an expected thing that doesn't trip exit-on-close.
648 *
649 * Because close_expected can't be set until we get into the worker
650 * thread, but the cancellable is signalled sooner (from another
651 * thread), we do still need to check the error.
652 */
653 if (worker->close_expected ||
654 g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
655 _g_dbus_worker_emit_disconnected (worker, FALSE, NULL);
656 else
657 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
658  
659 g_error_free (error);
660 goto out;
661 }
662  
663 #if 0
664 g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
665 (gint) bytes_read,
666 g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
667 g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))),
668 g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)),
669 G_IO_IN | G_IO_OUT | G_IO_HUP),
670 worker->stream,
671 worker);
672 #endif
673  
674 /* TODO: hmm, hmm... */
675 if (bytes_read == 0)
676 {
677 g_set_error (&error,
678 G_IO_ERROR,
679 G_IO_ERROR_FAILED,
680 "Underlying GIOStream returned 0 bytes on an async read");
681 _g_dbus_worker_emit_disconnected (worker, TRUE, error);
682 g_error_free (error);
683 goto out;
684 }
685  
686 read_message_print_transport_debug (bytes_read, worker);
687  
688 worker->read_buffer_cur_size += bytes_read;
689 if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size)
690 {
691 /* OK, got what we asked for! */
692 if (worker->read_buffer_bytes_wanted == 16)
693 {
694 gssize message_len;
695 /* OK, got the header - determine how many more bytes are needed */
696 error = NULL;
697 message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer,
698 16,
699 &error);
700 if (message_len == -1)
701 {
702 g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message);
703 _g_dbus_worker_emit_disconnected (worker, FALSE, error);
704 g_error_free (error);
705 goto out;
706 }
707  
708 worker->read_buffer_bytes_wanted = message_len;
709 _g_dbus_worker_do_read_unlocked (worker);
710 }
711 else
712 {
713 GDBusMessage *message;
714 error = NULL;
715  
716 /* TODO: use connection->priv->auth to decode the message */
717  
718 message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer,
719 worker->read_buffer_cur_size,
720 worker->capabilities,
721 &error);
722 if (message == NULL)
723 {
724 gchar *s;
725 s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
726 g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n"
727 "The error is: %s\n"
728 "The payload is as follows:\n"
729 "%s\n",
730 worker->read_buffer_cur_size,
731 error->message,
732 s);
733 g_free (s);
734 _g_dbus_worker_emit_disconnected (worker, FALSE, error);
735 g_error_free (error);
736 goto out;
737 }
738  
739 #ifdef G_OS_UNIX
740 if (worker->read_fd_list != NULL)
741 {
742 g_dbus_message_set_unix_fd_list (message, worker->read_fd_list);
743 g_object_unref (worker->read_fd_list);
744 worker->read_fd_list = NULL;
745 }
746 #endif
747  
748 if (G_UNLIKELY (_g_dbus_debug_message ()))
749 {
750 gchar *s;
751 _g_dbus_debug_print_lock ();
752 g_print ("========================================================================\n"
753 "GDBus-debug:Message:\n"
754 " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
755 worker->read_buffer_cur_size);
756 s = g_dbus_message_print (message, 2);
757 g_print ("%s", s);
758 g_free (s);
759 if (G_UNLIKELY (_g_dbus_debug_payload ()))
760 {
761 s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2);
762 g_print ("%s\n", s);
763 g_free (s);
764 }
765 _g_dbus_debug_print_unlock ();
766 }
767  
768 /* yay, got a message, go deliver it */
769 _g_dbus_worker_queue_or_deliver_received_message (worker, message);
770  
771 /* start reading another message! */
772 worker->read_buffer_bytes_wanted = 0;
773 worker->read_buffer_cur_size = 0;
774 _g_dbus_worker_do_read_unlocked (worker);
775 }
776 }
777 else
778 {
779 /* didn't get all the bytes we requested - so repeat the request... */
780 _g_dbus_worker_do_read_unlocked (worker);
781 }
782  
783 out:
784 g_mutex_unlock (&worker->read_lock);
785  
786 /* gives up the reference acquired when calling g_input_stream_read_async() */
787 _g_dbus_worker_unref (worker);
788  
789 /* check if there is any pending close */
790 schedule_pending_close (worker);
791 }
792  
793 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
794 static void
795 _g_dbus_worker_do_read_unlocked (GDBusWorker *worker)
796 {
797 /* Note that we do need to keep trying to read even if close_expected is
798 * true, because only failing a read causes us to signal 'closed'.
799 */
800  
801 /* if bytes_wanted is zero, it means start reading a message */
802 if (worker->read_buffer_bytes_wanted == 0)
803 {
804 worker->read_buffer_cur_size = 0;
805 worker->read_buffer_bytes_wanted = 16;
806 }
807  
808 /* ensure we have a (big enough) buffer */
809 if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size)
810 {
811 /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
812 worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096);
813 worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size);
814 }
815  
816 if (worker->socket == NULL)
817 g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream),
818 worker->read_buffer + worker->read_buffer_cur_size,
819 worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
820 G_PRIORITY_DEFAULT,
821 worker->cancellable,
822 (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
823 _g_dbus_worker_ref (worker));
824 else
825 {
826 worker->read_ancillary_messages = NULL;
827 worker->read_num_ancillary_messages = 0;
828 _g_socket_read_with_control_messages (worker->socket,
829 worker->read_buffer + worker->read_buffer_cur_size,
830 worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size,
831 &worker->read_ancillary_messages,
832 &worker->read_num_ancillary_messages,
833 G_PRIORITY_DEFAULT,
834 worker->cancellable,
835 (GAsyncReadyCallback) _g_dbus_worker_do_read_cb,
836 _g_dbus_worker_ref (worker));
837 }
838 }
839  
840 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
841 static gboolean
842 _g_dbus_worker_do_initial_read (gpointer data)
843 {
844 GDBusWorker *worker = data;
845 g_mutex_lock (&worker->read_lock);
846 _g_dbus_worker_do_read_unlocked (worker);
847 g_mutex_unlock (&worker->read_lock);
848 return FALSE;
849 }
850  
851 /* ---------------------------------------------------------------------------------------------------- */
852  
853 struct _MessageToWriteData
854 {
855 GDBusWorker *worker;
856 GDBusMessage *message;
857 gchar *blob;
858 gsize blob_size;
859  
860 gsize total_written;
861 GTask *task;
862 };
863  
864 static void
865 message_to_write_data_free (MessageToWriteData *data)
866 {
867 _g_dbus_worker_unref (data->worker);
868 if (data->message)
869 g_object_unref (data->message);
870 g_free (data->blob);
871 g_slice_free (MessageToWriteData, data);
872 }
873  
874 /* ---------------------------------------------------------------------------------------------------- */
875  
876 static void write_message_continue_writing (MessageToWriteData *data);
877  
878 /* called in private thread shared by all GDBusConnection instances
879 *
880 * write-lock is not held on entry
881 * output_pending is PENDING_WRITE on entry
882 */
883 static void
884 write_message_async_cb (GObject *source_object,
885 GAsyncResult *res,
886 gpointer user_data)
887 {
888 MessageToWriteData *data = user_data;
889 GTask *task;
890 gssize bytes_written;
891 GError *error;
892  
893 /* Note: we can't access data->task after calling g_task_return_* () because the
894 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
895 */
896 task = data->task;
897  
898 error = NULL;
899 bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object),
900 res,
901 &error);
902 if (bytes_written == -1)
903 {
904 g_task_return_error (task, error);
905 g_object_unref (task);
906 goto out;
907 }
908 g_assert (bytes_written > 0); /* zero is never returned */
909  
910 write_message_print_transport_debug (bytes_written, data);
911  
912 data->total_written += bytes_written;
913 g_assert (data->total_written <= data->blob_size);
914 if (data->total_written == data->blob_size)
915 {
916 g_task_return_boolean (task, TRUE);
917 g_object_unref (task);
918 goto out;
919 }
920  
921 write_message_continue_writing (data);
922  
923 out:
924 ;
925 }
926  
927 /* called in private thread shared by all GDBusConnection instances
928 *
929 * write-lock is not held on entry
930 * output_pending is PENDING_WRITE on entry
931 */
932 #ifdef G_OS_UNIX
933 static gboolean
934 on_socket_ready (GSocket *socket,
935 GIOCondition condition,
936 gpointer user_data)
937 {
938 MessageToWriteData *data = user_data;
939 write_message_continue_writing (data);
940 return FALSE; /* remove source */
941 }
942 #endif
943  
944 /* called in private thread shared by all GDBusConnection instances
945 *
946 * write-lock is not held on entry
947 * output_pending is PENDING_WRITE on entry
948 */
949 static void
950 write_message_continue_writing (MessageToWriteData *data)
951 {
952 GOutputStream *ostream;
953 #ifdef G_OS_UNIX
954 GTask *task;
955 GUnixFDList *fd_list;
956 #endif
957  
958 #ifdef G_OS_UNIX
959 /* Note: we can't access data->task after calling g_task_return_* () because the
960 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
961 */
962 task = data->task;
963 #endif
964  
965 ostream = g_io_stream_get_output_stream (data->worker->stream);
966 #ifdef G_OS_UNIX
967 fd_list = g_dbus_message_get_unix_fd_list (data->message);
968 #endif
969  
970 g_assert (!g_output_stream_has_pending (ostream));
971 g_assert_cmpint (data->total_written, <, data->blob_size);
972  
973 if (FALSE)
974 {
975 }
976 #ifdef G_OS_UNIX
977 else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0)
978 {
979 GOutputVector vector;
980 GSocketControlMessage *control_message;
981 gssize bytes_written;
982 GError *error;
983  
984 vector.buffer = data->blob;
985 vector.size = data->blob_size;
986  
987 control_message = NULL;
988 if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0)
989 {
990 if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING))
991 {
992 g_task_return_new_error (task,
993 G_IO_ERROR,
994 G_IO_ERROR_FAILED,
995 "Tried sending a file descriptor but remote peer does not support this capability");
996 g_object_unref (task);
997 goto out;
998 }
999 control_message = g_unix_fd_message_new_with_fd_list (fd_list);
1000 }
1001  
1002 error = NULL;
1003 bytes_written = g_socket_send_message (data->worker->socket,
1004 NULL, /* address */
1005 &vector,
1006 1,
1007 control_message != NULL ? &control_message : NULL,
1008 control_message != NULL ? 1 : 0,
1009 G_SOCKET_MSG_NONE,
1010 data->worker->cancellable,
1011 &error);
1012 if (control_message != NULL)
1013 g_object_unref (control_message);
1014  
1015 if (bytes_written == -1)
1016 {
1017 /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1018 if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1019 {
1020 GSource *source;
1021 source = g_socket_create_source (data->worker->socket,
1022 G_IO_OUT | G_IO_HUP | G_IO_ERR,
1023 data->worker->cancellable);
1024 g_source_set_callback (source,
1025 (GSourceFunc) on_socket_ready,
1026 data,
1027 NULL); /* GDestroyNotify */
1028 g_source_attach (source, g_main_context_get_thread_default ());
1029 g_source_unref (source);
1030 g_error_free (error);
1031 goto out;
1032 }
1033 g_task_return_error (task, error);
1034 g_object_unref (task);
1035 goto out;
1036 }
1037 g_assert (bytes_written > 0); /* zero is never returned */
1038  
1039 write_message_print_transport_debug (bytes_written, data);
1040  
1041 data->total_written += bytes_written;
1042 g_assert (data->total_written <= data->blob_size);
1043 if (data->total_written == data->blob_size)
1044 {
1045 g_task_return_boolean (task, TRUE);
1046 g_object_unref (task);
1047 goto out;
1048 }
1049  
1050 write_message_continue_writing (data);
1051 }
1052 #endif
1053 else
1054 {
1055 #ifdef G_OS_UNIX
1056 if (fd_list != NULL)
1057 {
1058 g_task_return_new_error (task,
1059 G_IO_ERROR,
1060 G_IO_ERROR_FAILED,
1061 "Tried sending a file descriptor on unsupported stream of type %s",
1062 g_type_name (G_TYPE_FROM_INSTANCE (ostream)));
1063 g_object_unref (task);
1064 goto out;
1065 }
1066 #endif
1067  
1068 g_output_stream_write_async (ostream,
1069 (const gchar *) data->blob + data->total_written,
1070 data->blob_size - data->total_written,
1071 G_PRIORITY_DEFAULT,
1072 data->worker->cancellable,
1073 write_message_async_cb,
1074 data);
1075 }
1076 #ifdef G_OS_UNIX
1077 out:
1078 #endif
1079 ;
1080 }
1081  
1082 /* called in private thread shared by all GDBusConnection instances
1083 *
1084 * write-lock is not held on entry
1085 * output_pending is PENDING_WRITE on entry
1086 */
1087 static void
1088 write_message_async (GDBusWorker *worker,
1089 MessageToWriteData *data,
1090 GAsyncReadyCallback callback,
1091 gpointer user_data)
1092 {
1093 data->task = g_task_new (NULL, NULL, callback, user_data);
1094 data->total_written = 0;
1095 write_message_continue_writing (data);
1096 }
1097  
1098 /* called in private thread shared by all GDBusConnection instances (with write-lock held) */
1099 static gboolean
1100 write_message_finish (GAsyncResult *res,
1101 GError **error)
1102 {
1103 g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE);
1104  
1105 return g_task_propagate_boolean (G_TASK (res), error);
1106 }
1107 /* ---------------------------------------------------------------------------------------------------- */
1108  
1109 static void continue_writing (GDBusWorker *worker);
1110  
1111 typedef struct
1112 {
1113 GDBusWorker *worker;
1114 GList *flushers;
1115 } FlushAsyncData;
1116  
1117 static void
1118 flush_data_list_complete (const GList *flushers,
1119 const GError *error)
1120 {
1121 const GList *l;
1122  
1123 for (l = flushers; l != NULL; l = l->next)
1124 {
1125 FlushData *f = l->data;
1126  
1127 f->error = error != NULL ? g_error_copy (error) : NULL;
1128  
1129 g_mutex_lock (&f->mutex);
1130 g_cond_signal (&f->cond);
1131 g_mutex_unlock (&f->mutex);
1132 }
1133 }
1134  
1135 /* called in private thread shared by all GDBusConnection instances
1136 *
1137 * write-lock is not held on entry
1138 * output_pending is PENDING_FLUSH on entry
1139 */
1140 static void
1141 ostream_flush_cb (GObject *source_object,
1142 GAsyncResult *res,
1143 gpointer user_data)
1144 {
1145 FlushAsyncData *data = user_data;
1146 GError *error;
1147  
1148 error = NULL;
1149 g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
1150 res,
1151 &error);
1152  
1153 if (error == NULL)
1154 {
1155 if (G_UNLIKELY (_g_dbus_debug_transport ()))
1156 {
1157 _g_dbus_debug_print_lock ();
1158 g_print ("========================================================================\n"
1159 "GDBus-debug:Transport:\n"
1160 " ---- FLUSHED stream of type %s\n",
1161 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
1162 _g_dbus_debug_print_unlock ();
1163 }
1164 }
1165  
1166 g_assert (data->flushers != NULL);
1167 flush_data_list_complete (data->flushers, error);
1168 g_list_free (data->flushers);
1169  
1170 if (error != NULL)
1171 g_error_free (error);
1172  
1173 /* Make sure we tell folks that we don't have additional
1174 flushes pending */
1175 g_mutex_lock (&data->worker->write_lock);
1176 data->worker->write_num_messages_flushed = data->worker->write_num_messages_written;
1177 g_assert (data->worker->output_pending == PENDING_FLUSH);
1178 data->worker->output_pending = PENDING_NONE;
1179 g_mutex_unlock (&data->worker->write_lock);
1180  
1181 /* OK, cool, finally kick off the next write */
1182 continue_writing (data->worker);
1183  
1184 _g_dbus_worker_unref (data->worker);
1185 g_free (data);
1186 }
1187  
1188 /* called in private thread shared by all GDBusConnection instances
1189 *
1190 * write-lock is not held on entry
1191 * output_pending is PENDING_FLUSH on entry
1192 */
1193 static void
1194 start_flush (FlushAsyncData *data)
1195 {
1196 g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
1197 G_PRIORITY_DEFAULT,
1198 data->worker->cancellable,
1199 ostream_flush_cb,
1200 data);
1201 }
1202  
1203 /* called in private thread shared by all GDBusConnection instances
1204 *
1205 * write-lock is held on entry
1206 * output_pending is PENDING_NONE on entry
1207 */
1208 static void
1209 message_written_unlocked (GDBusWorker *worker,
1210 MessageToWriteData *message_data)
1211 {
1212 if (G_UNLIKELY (_g_dbus_debug_message ()))
1213 {
1214 gchar *s;
1215 _g_dbus_debug_print_lock ();
1216 g_print ("========================================================================\n"
1217 "GDBus-debug:Message:\n"
1218 " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n",
1219 message_data->blob_size);
1220 s = g_dbus_message_print (message_data->message, 2);
1221 g_print ("%s", s);
1222 g_free (s);
1223 if (G_UNLIKELY (_g_dbus_debug_payload ()))
1224 {
1225 s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2);
1226 g_print ("%s\n", s);
1227 g_free (s);
1228 }
1229 _g_dbus_debug_print_unlock ();
1230 }
1231  
1232 worker->write_num_messages_written += 1;
1233 }
1234  
1235 /* called in private thread shared by all GDBusConnection instances
1236 *
1237 * write-lock is held on entry
1238 * output_pending is PENDING_NONE on entry
1239 *
1240 * Returns: non-%NULL, setting @output_pending, if we need to flush now
1241 */
1242 static FlushAsyncData *
1243 prepare_flush_unlocked (GDBusWorker *worker)
1244 {
1245 GList *l;
1246 GList *ll;
1247 GList *flushers;
1248  
1249 flushers = NULL;
1250 for (l = worker->write_pending_flushes; l != NULL; l = ll)
1251 {
1252 FlushData *f = l->data;
1253 ll = l->next;
1254  
1255 if (f->number_to_wait_for == worker->write_num_messages_written)
1256 {
1257 flushers = g_list_append (flushers, f);
1258 worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
1259 }
1260 }
1261 if (flushers != NULL)
1262 {
1263 g_assert (worker->output_pending == PENDING_NONE);
1264 worker->output_pending = PENDING_FLUSH;
1265 }
1266  
1267 if (flushers != NULL)
1268 {
1269 FlushAsyncData *data;
1270  
1271 data = g_new0 (FlushAsyncData, 1);
1272 data->worker = _g_dbus_worker_ref (worker);
1273 data->flushers = flushers;
1274 return data;
1275 }
1276  
1277 return NULL;
1278 }
1279  
1280 /* called in private thread shared by all GDBusConnection instances
1281 *
1282 * write-lock is not held on entry
1283 * output_pending is PENDING_WRITE on entry
1284 */
1285 static void
1286 write_message_cb (GObject *source_object,
1287 GAsyncResult *res,
1288 gpointer user_data)
1289 {
1290 MessageToWriteData *data = user_data;
1291 GError *error;
1292  
1293 g_mutex_lock (&data->worker->write_lock);
1294 g_assert (data->worker->output_pending == PENDING_WRITE);
1295 data->worker->output_pending = PENDING_NONE;
1296  
1297 error = NULL;
1298 if (!write_message_finish (res, &error))
1299 {
1300 g_mutex_unlock (&data->worker->write_lock);
1301  
1302 /* TODO: handle */
1303 _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
1304 g_error_free (error);
1305  
1306 g_mutex_lock (&data->worker->write_lock);
1307 }
1308  
1309 message_written_unlocked (data->worker, data);
1310  
1311 g_mutex_unlock (&data->worker->write_lock);
1312  
1313 continue_writing (data->worker);
1314  
1315 message_to_write_data_free (data);
1316 }
1317  
1318 /* called in private thread shared by all GDBusConnection instances
1319 *
1320 * write-lock is not held on entry
1321 * output_pending is PENDING_CLOSE on entry
1322 */
1323 static void
1324 iostream_close_cb (GObject *source_object,
1325 GAsyncResult *res,
1326 gpointer user_data)
1327 {
1328 GDBusWorker *worker = user_data;
1329 GError *error = NULL;
1330 GList *pending_close_attempts, *pending_flush_attempts;
1331 GQueue *send_queue;
1332  
1333 g_io_stream_close_finish (worker->stream, res, &error);
1334  
1335 g_mutex_lock (&worker->write_lock);
1336  
1337 pending_close_attempts = worker->pending_close_attempts;
1338 worker->pending_close_attempts = NULL;
1339  
1340 pending_flush_attempts = worker->write_pending_flushes;
1341 worker->write_pending_flushes = NULL;
1342  
1343 send_queue = worker->write_queue;
1344 worker->write_queue = g_queue_new ();
1345  
1346 g_assert (worker->output_pending == PENDING_CLOSE);
1347 worker->output_pending = PENDING_NONE;
1348  
1349 g_mutex_unlock (&worker->write_lock);
1350  
1351 while (pending_close_attempts != NULL)
1352 {
1353 CloseData *close_data = pending_close_attempts->data;
1354  
1355 pending_close_attempts = g_list_delete_link (pending_close_attempts,
1356 pending_close_attempts);
1357  
1358 if (close_data->task != NULL)
1359 {
1360 if (error != NULL)
1361 g_task_return_error (close_data->task, g_error_copy (error));
1362 else
1363 g_task_return_boolean (close_data->task, TRUE);
1364 }
1365  
1366 close_data_free (close_data);
1367 }
1368  
1369 g_clear_error (&error);
1370  
1371 /* all messages queued for sending are discarded */
1372 g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free);
1373 /* all queued flushes fail */
1374 error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED,
1375 _("Operation was cancelled"));
1376 flush_data_list_complete (pending_flush_attempts, error);
1377 g_list_free (pending_flush_attempts);
1378 g_clear_error (&error);
1379  
1380 _g_dbus_worker_unref (worker);
1381 }
1382  
1383 /* called in private thread shared by all GDBusConnection instances
1384 *
1385 * write-lock is not held on entry
1386 * output_pending must be PENDING_NONE on entry
1387 */
1388 static void
1389 continue_writing (GDBusWorker *worker)
1390 {
1391 MessageToWriteData *data;
1392 FlushAsyncData *flush_async_data;
1393  
1394 write_next:
1395 /* we mustn't try to write two things at once */
1396 g_assert (worker->output_pending == PENDING_NONE);
1397  
1398 g_mutex_lock (&worker->write_lock);
1399  
1400 data = NULL;
1401 flush_async_data = NULL;
1402  
1403 /* if we want to close the connection, that takes precedence */
1404 if (worker->pending_close_attempts != NULL)
1405 {
1406 GInputStream *input = g_io_stream_get_input_stream (worker->stream);
1407  
1408 if (!g_input_stream_has_pending (input))
1409 {
1410 worker->close_expected = TRUE;
1411 worker->output_pending = PENDING_CLOSE;
1412  
1413 g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT,
1414 NULL, iostream_close_cb,
1415 _g_dbus_worker_ref (worker));
1416 }
1417 }
1418 else
1419 {
1420 flush_async_data = prepare_flush_unlocked (worker);
1421  
1422 if (flush_async_data == NULL)
1423 {
1424 data = g_queue_pop_head (worker->write_queue);
1425  
1426 if (data != NULL)
1427 worker->output_pending = PENDING_WRITE;
1428 }
1429 }
1430  
1431 g_mutex_unlock (&worker->write_lock);
1432  
1433 /* Note that write_lock is only used for protecting the @write_queue
1434 * and @output_pending fields of the GDBusWorker struct ... which we
1435 * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1436 *
1437 * Therefore, it's fine to drop it here when calling back into user
1438 * code and then writing the message out onto the GIOStream since this
1439 * function only runs on the worker thread.
1440 */
1441  
1442 if (flush_async_data != NULL)
1443 {
1444 start_flush (flush_async_data);
1445 g_assert (data == NULL);
1446 }
1447 else if (data != NULL)
1448 {
1449 GDBusMessage *old_message;
1450 guchar *new_blob;
1451 gsize new_blob_size;
1452 GError *error;
1453  
1454 old_message = data->message;
1455 data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
1456 if (data->message == old_message)
1457 {
1458 /* filters had no effect - do nothing */
1459 }
1460 else if (data->message == NULL)
1461 {
1462 /* filters dropped message */
1463 g_mutex_lock (&worker->write_lock);
1464 worker->output_pending = PENDING_NONE;
1465 g_mutex_unlock (&worker->write_lock);
1466 message_to_write_data_free (data);
1467 goto write_next;
1468 }
1469 else
1470 {
1471 /* filters altered the message -> reencode */
1472 error = NULL;
1473 new_blob = g_dbus_message_to_blob (data->message,
1474 &new_blob_size,
1475 worker->capabilities,
1476 &error);
1477 if (new_blob == NULL)
1478 {
1479 /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1480 * the old message instead
1481 */
1482 g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1483 g_dbus_message_get_serial (data->message),
1484 error->message);
1485 g_error_free (error);
1486 }
1487 else
1488 {
1489 g_free (data->blob);
1490 data->blob = (gchar *) new_blob;
1491 data->blob_size = new_blob_size;
1492 }
1493 }
1494  
1495 write_message_async (worker,
1496 data,
1497 write_message_cb,
1498 data);
1499 }
1500 }
1501  
1502 /* called in private thread shared by all GDBusConnection instances
1503 *
1504 * write-lock is not held on entry
1505 * output_pending may be anything
1506 */
1507 static gboolean
1508 continue_writing_in_idle_cb (gpointer user_data)
1509 {
1510 GDBusWorker *worker = user_data;
1511  
1512 /* Because this is the worker thread, we can read this struct member
1513 * without holding the lock: no other thread ever modifies it.
1514 */
1515 if (worker->output_pending == PENDING_NONE)
1516 continue_writing (worker);
1517  
1518 return FALSE;
1519 }
1520  
1521 /*
1522 * @write_data: (transfer full) (allow-none):
1523 * @flush_data: (transfer full) (allow-none):
1524 * @close_data: (transfer full) (allow-none):
1525 *
1526 * Can be called from any thread
1527 *
1528 * write_lock is held on entry
1529 * output_pending may be anything
1530 */
1531 static void
1532 schedule_writing_unlocked (GDBusWorker *worker,
1533 MessageToWriteData *write_data,
1534 FlushData *flush_data,
1535 CloseData *close_data)
1536 {
1537 if (write_data != NULL)
1538 g_queue_push_tail (worker->write_queue, write_data);
1539  
1540 if (flush_data != NULL)
1541 worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data);
1542  
1543 if (close_data != NULL)
1544 worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts,
1545 close_data);
1546  
1547 /* If we had output pending, the next bit of output will happen
1548 * automatically when it finishes, so we only need to do this
1549 * if nothing was pending.
1550 *
1551 * The idle callback will re-check that output_pending is still
1552 * PENDING_NONE, to guard against output starting before the idle.
1553 */
1554 if (worker->output_pending == PENDING_NONE)
1555 {
1556 GSource *idle_source;
1557 idle_source = g_idle_source_new ();
1558 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1559 g_source_set_callback (idle_source,
1560 continue_writing_in_idle_cb,
1561 _g_dbus_worker_ref (worker),
1562 (GDestroyNotify) _g_dbus_worker_unref);
1563 g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb");
1564 g_source_attach (idle_source, worker->shared_thread_data->context);
1565 g_source_unref (idle_source);
1566 }
1567 }
1568  
1569 static void
1570 schedule_pending_close (GDBusWorker *worker)
1571 {
1572 if (!worker->pending_close_attempts)
1573 return;
1574  
1575 schedule_writing_unlocked (worker, NULL, NULL, NULL);
1576 }
1577  
1578 /* ---------------------------------------------------------------------------------------------------- */
1579  
1580 /* can be called from any thread - steals blob
1581 *
1582 * write_lock is not held on entry
1583 * output_pending may be anything
1584 */
1585 void
1586 _g_dbus_worker_send_message (GDBusWorker *worker,
1587 GDBusMessage *message,
1588 gchar *blob,
1589 gsize blob_len)
1590 {
1591 MessageToWriteData *data;
1592  
1593 g_return_if_fail (G_IS_DBUS_MESSAGE (message));
1594 g_return_if_fail (blob != NULL);
1595 g_return_if_fail (blob_len > 16);
1596  
1597 data = g_slice_new0 (MessageToWriteData);
1598 data->worker = _g_dbus_worker_ref (worker);
1599 data->message = g_object_ref (message);
1600 data->blob = blob; /* steal! */
1601 data->blob_size = blob_len;
1602  
1603 g_mutex_lock (&worker->write_lock);
1604 schedule_writing_unlocked (worker, data, NULL, NULL);
1605 g_mutex_unlock (&worker->write_lock);
1606 }
1607  
1608 /* ---------------------------------------------------------------------------------------------------- */
1609  
1610 GDBusWorker *
1611 _g_dbus_worker_new (GIOStream *stream,
1612 GDBusCapabilityFlags capabilities,
1613 gboolean initially_frozen,
1614 GDBusWorkerMessageReceivedCallback message_received_callback,
1615 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
1616 GDBusWorkerDisconnectedCallback disconnected_callback,
1617 gpointer user_data)
1618 {
1619 GDBusWorker *worker;
1620 GSource *idle_source;
1621  
1622 g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
1623 g_return_val_if_fail (message_received_callback != NULL, NULL);
1624 g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
1625 g_return_val_if_fail (disconnected_callback != NULL, NULL);
1626  
1627 worker = g_new0 (GDBusWorker, 1);
1628 worker->ref_count = 1;
1629  
1630 g_mutex_init (&worker->read_lock);
1631 worker->message_received_callback = message_received_callback;
1632 worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
1633 worker->disconnected_callback = disconnected_callback;
1634 worker->user_data = user_data;
1635 worker->stream = g_object_ref (stream);
1636 worker->capabilities = capabilities;
1637 worker->cancellable = g_cancellable_new ();
1638 worker->output_pending = PENDING_NONE;
1639  
1640 worker->frozen = initially_frozen;
1641 worker->received_messages_while_frozen = g_queue_new ();
1642  
1643 g_mutex_init (&worker->write_lock);
1644 worker->write_queue = g_queue_new ();
1645  
1646 if (G_IS_SOCKET_CONNECTION (worker->stream))
1647 worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream));
1648  
1649 worker->shared_thread_data = _g_dbus_shared_thread_ref ();
1650  
1651 /* begin reading */
1652 idle_source = g_idle_source_new ();
1653 g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
1654 g_source_set_callback (idle_source,
1655 _g_dbus_worker_do_initial_read,
1656 _g_dbus_worker_ref (worker),
1657 (GDestroyNotify) _g_dbus_worker_unref);
1658 g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read");
1659 g_source_attach (idle_source, worker->shared_thread_data->context);
1660 g_source_unref (idle_source);
1661  
1662 return worker;
1663 }
1664  
1665 /* ---------------------------------------------------------------------------------------------------- */
1666  
1667 /* can be called from any thread
1668 *
1669 * write_lock is not held on entry
1670 * output_pending may be anything
1671 */
1672 void
1673 _g_dbus_worker_close (GDBusWorker *worker,
1674 GTask *task)
1675 {
1676 CloseData *close_data;
1677  
1678 close_data = g_slice_new0 (CloseData);
1679 close_data->worker = _g_dbus_worker_ref (worker);
1680 close_data->task = (task == NULL ? NULL : g_object_ref (task));
1681  
1682 /* Don't set worker->close_expected here - we're in the wrong thread.
1683 * It'll be set before the actual close happens.
1684 */
1685 g_cancellable_cancel (worker->cancellable);
1686 g_mutex_lock (&worker->write_lock);
1687 schedule_writing_unlocked (worker, NULL, NULL, close_data);
1688 g_mutex_unlock (&worker->write_lock);
1689 }
1690  
1691 /* This can be called from any thread - frees worker. Note that
1692 * callbacks might still happen if called from another thread than the
1693 * worker - use your own synchronization primitive in the callbacks.
1694 *
1695 * write_lock is not held on entry
1696 * output_pending may be anything
1697 */
1698 void
1699 _g_dbus_worker_stop (GDBusWorker *worker)
1700 {
1701 g_atomic_int_set (&worker->stopped, TRUE);
1702  
1703 /* Cancel any pending operations and schedule a close of the underlying I/O
1704 * stream in the worker thread
1705 */
1706 _g_dbus_worker_close (worker, NULL);
1707  
1708 /* _g_dbus_worker_close holds a ref until after an idle in the worker
1709 * thread has run, so we no longer need to unref in an idle like in
1710 * commit 322e25b535
1711 */
1712 _g_dbus_worker_unref (worker);
1713 }
1714  
1715 /* ---------------------------------------------------------------------------------------------------- */
1716  
1717 /* can be called from any thread (except the worker thread) - blocks
1718 * calling thread until all queued outgoing messages are written and
1719 * the transport has been flushed
1720 *
1721 * write_lock is not held on entry
1722 * output_pending may be anything
1723 */
1724 gboolean
1725 _g_dbus_worker_flush_sync (GDBusWorker *worker,
1726 GCancellable *cancellable,
1727 GError **error)
1728 {
1729 gboolean ret;
1730 FlushData *data;
1731 guint64 pending_writes;
1732  
1733 data = NULL;
1734 ret = TRUE;
1735  
1736 g_mutex_lock (&worker->write_lock);
1737  
1738 /* if the queue is empty, no write is in-flight and we haven't written
1739 * anything since the last flush, then there's nothing to wait for
1740 */
1741 pending_writes = g_queue_get_length (worker->write_queue);
1742  
1743 /* if a write is in-flight, we shouldn't be satisfied until the first
1744 * flush operation that follows it
1745 */
1746 if (worker->output_pending == PENDING_WRITE)
1747 pending_writes += 1;
1748  
1749 if (pending_writes > 0 ||
1750 worker->write_num_messages_written != worker->write_num_messages_flushed)
1751 {
1752 data = g_new0 (FlushData, 1);
1753 g_mutex_init (&data->mutex);
1754 g_cond_init (&data->cond);
1755 data->number_to_wait_for = worker->write_num_messages_written + pending_writes;
1756 g_mutex_lock (&data->mutex);
1757  
1758 schedule_writing_unlocked (worker, NULL, data, NULL);
1759 }
1760 g_mutex_unlock (&worker->write_lock);
1761  
1762 if (data != NULL)
1763 {
1764 g_cond_wait (&data->cond, &data->mutex);
1765 g_mutex_unlock (&data->mutex);
1766  
1767 /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1768 g_cond_clear (&data->cond);
1769 g_mutex_clear (&data->mutex);
1770 if (data->error != NULL)
1771 {
1772 ret = FALSE;
1773 g_propagate_error (error, data->error);
1774 }
1775 g_free (data);
1776 }
1777  
1778 return ret;
1779 }
1780  
1781 /* ---------------------------------------------------------------------------------------------------- */
1782  
1783 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1784 #define G_DBUS_DEBUG_TRANSPORT (1<<1)
1785 #define G_DBUS_DEBUG_MESSAGE (1<<2)
1786 #define G_DBUS_DEBUG_PAYLOAD (1<<3)
1787 #define G_DBUS_DEBUG_CALL (1<<4)
1788 #define G_DBUS_DEBUG_SIGNAL (1<<5)
1789 #define G_DBUS_DEBUG_INCOMING (1<<6)
1790 #define G_DBUS_DEBUG_RETURN (1<<7)
1791 #define G_DBUS_DEBUG_EMISSION (1<<8)
1792 #define G_DBUS_DEBUG_ADDRESS (1<<9)
1793  
1794 static gint _gdbus_debug_flags = 0;
1795  
1796 gboolean
1797 _g_dbus_debug_authentication (void)
1798 {
1799 _g_dbus_initialize ();
1800 return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0;
1801 }
1802  
1803 gboolean
1804 _g_dbus_debug_transport (void)
1805 {
1806 _g_dbus_initialize ();
1807 return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0;
1808 }
1809  
1810 gboolean
1811 _g_dbus_debug_message (void)
1812 {
1813 _g_dbus_initialize ();
1814 return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0;
1815 }
1816  
1817 gboolean
1818 _g_dbus_debug_payload (void)
1819 {
1820 _g_dbus_initialize ();
1821 return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0;
1822 }
1823  
1824 gboolean
1825 _g_dbus_debug_call (void)
1826 {
1827 _g_dbus_initialize ();
1828 return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0;
1829 }
1830  
1831 gboolean
1832 _g_dbus_debug_signal (void)
1833 {
1834 _g_dbus_initialize ();
1835 return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0;
1836 }
1837  
1838 gboolean
1839 _g_dbus_debug_incoming (void)
1840 {
1841 _g_dbus_initialize ();
1842 return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0;
1843 }
1844  
1845 gboolean
1846 _g_dbus_debug_return (void)
1847 {
1848 _g_dbus_initialize ();
1849 return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0;
1850 }
1851  
1852 gboolean
1853 _g_dbus_debug_emission (void)
1854 {
1855 _g_dbus_initialize ();
1856 return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0;
1857 }
1858  
1859 gboolean
1860 _g_dbus_debug_address (void)
1861 {
1862 _g_dbus_initialize ();
1863 return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0;
1864 }
1865  
1866 G_LOCK_DEFINE_STATIC (print_lock);
1867  
1868 void
1869 _g_dbus_debug_print_lock (void)
1870 {
1871 G_LOCK (print_lock);
1872 }
1873  
1874 void
1875 _g_dbus_debug_print_unlock (void)
1876 {
1877 G_UNLOCK (print_lock);
1878 }
1879  
1880 /**
1881 * _g_dbus_initialize:
1882 *
1883 * Does various one-time init things such as
1884 *
1885 * - registering the G_DBUS_ERROR error domain
1886 * - parses the G_DBUS_DEBUG environment variable
1887 */
1888 void
1889 _g_dbus_initialize (void)
1890 {
1891 static volatile gsize initialized = 0;
1892  
1893 if (g_once_init_enter (&initialized))
1894 {
1895 volatile GQuark g_dbus_error_domain;
1896 const gchar *debug;
1897  
1898 g_dbus_error_domain = G_DBUS_ERROR;
1899 (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */
1900  
1901 debug = g_getenv ("G_DBUS_DEBUG");
1902 if (debug != NULL)
1903 {
1904 const GDebugKey keys[] = {
1905 { "authentication", G_DBUS_DEBUG_AUTHENTICATION },
1906 { "transport", G_DBUS_DEBUG_TRANSPORT },
1907 { "message", G_DBUS_DEBUG_MESSAGE },
1908 { "payload", G_DBUS_DEBUG_PAYLOAD },
1909 { "call", G_DBUS_DEBUG_CALL },
1910 { "signal", G_DBUS_DEBUG_SIGNAL },
1911 { "incoming", G_DBUS_DEBUG_INCOMING },
1912 { "return", G_DBUS_DEBUG_RETURN },
1913 { "emission", G_DBUS_DEBUG_EMISSION },
1914 { "address", G_DBUS_DEBUG_ADDRESS }
1915 };
1916  
1917 _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys));
1918 if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD)
1919 _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE;
1920 }
1921  
1922 g_once_init_leave (&initialized, 1);
1923 }
1924 }
1925  
1926 /* ---------------------------------------------------------------------------------------------------- */
1927  
1928 GVariantType *
1929 _g_dbus_compute_complete_signature (GDBusArgInfo **args)
1930 {
1931 const GVariantType *arg_types[256];
1932 guint n;
1933  
1934 if (args)
1935 for (n = 0; args[n] != NULL; n++)
1936 {
1937 /* DBus places a hard limit of 255 on signature length.
1938 * therefore number of args must be less than 256.
1939 */
1940 g_assert (n < 256);
1941  
1942 arg_types[n] = G_VARIANT_TYPE (args[n]->signature);
1943  
1944 if G_UNLIKELY (arg_types[n] == NULL)
1945 return NULL;
1946 }
1947 else
1948 n = 0;
1949  
1950 return g_variant_type_new_tuple (arg_types, n);
1951 }
1952  
1953 /* ---------------------------------------------------------------------------------------------------- */
1954  
1955 #ifdef G_OS_WIN32
1956  
1957 extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid);
1958  
1959 gchar *
1960 _g_dbus_win32_get_user_sid (void)
1961 {
1962 HANDLE h;
1963 TOKEN_USER *user;
1964 DWORD token_information_len;
1965 PSID psid;
1966 gchar *sid;
1967 gchar *ret;
1968  
1969 ret = NULL;
1970 user = NULL;
1971 h = INVALID_HANDLE_VALUE;
1972  
1973 if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h))
1974 {
1975 g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ());
1976 goto out;
1977 }
1978  
1979 /* Get length of buffer */
1980 token_information_len = 0;
1981 if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len))
1982 {
1983 if (GetLastError () != ERROR_INSUFFICIENT_BUFFER)
1984 {
1985 g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1986 goto out;
1987 }
1988 }
1989 user = g_malloc (token_information_len);
1990 if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len))
1991 {
1992 g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ());
1993 goto out;
1994 }
1995  
1996 psid = user->User.Sid;
1997 if (!IsValidSid (psid))
1998 {
1999 g_warning ("Invalid SID");
2000 goto out;
2001 }
2002  
2003 if (!ConvertSidToStringSidA (psid, &sid))
2004 {
2005 g_warning ("Invalid SID");
2006 goto out;
2007 }
2008  
2009 ret = g_strdup (sid);
2010 LocalFree (sid);
2011  
2012 out:
2013 g_free (user);
2014 if (h != INVALID_HANDLE_VALUE)
2015 CloseHandle (h);
2016 return ret;
2017 }
2018 #endif
2019  
2020 /* ---------------------------------------------------------------------------------------------------- */
2021  
2022 gchar *
2023 _g_dbus_get_machine_id (GError **error)
2024 {
2025 #ifdef G_OS_WIN32
2026 HW_PROFILE_INFOA info;
2027 char *src, *dest, *res;
2028 int i;
2029  
2030 if (!GetCurrentHwProfileA (&info))
2031 {
2032 char *message = g_win32_error_message (GetLastError ());
2033 g_set_error (error,
2034 G_IO_ERROR,
2035 G_IO_ERROR_FAILED,
2036 _("Unable to get Hardware profile: %s"), message);
2037 g_free (message);
2038 return NULL;
2039 }
2040  
2041 /* Form: {12340001-4980-1920-6788-123456789012} */
2042 src = &info.szHwProfileGuid[0];
2043  
2044 res = g_malloc (32+1);
2045 dest = res;
2046  
2047 src++; /* Skip { */
2048 for (i = 0; i < 8; i++)
2049 *dest++ = *src++;
2050 src++; /* Skip - */
2051 for (i = 0; i < 4; i++)
2052 *dest++ = *src++;
2053 src++; /* Skip - */
2054 for (i = 0; i < 4; i++)
2055 *dest++ = *src++;
2056 src++; /* Skip - */
2057 for (i = 0; i < 4; i++)
2058 *dest++ = *src++;
2059 src++; /* Skip - */
2060 for (i = 0; i < 12; i++)
2061 *dest++ = *src++;
2062 *dest = 0;
2063  
2064 return res;
2065 #else
2066 gchar *ret;
2067 GError *first_error;
2068 /* TODO: use PACKAGE_LOCALSTATEDIR ? */
2069 ret = NULL;
2070 first_error = NULL;
2071 if (!g_file_get_contents ("/var/lib/dbus/machine-id",
2072 &ret,
2073 NULL,
2074 &first_error) &&
2075 !g_file_get_contents ("/etc/machine-id",
2076 &ret,
2077 NULL,
2078 NULL))
2079 {
2080 g_propagate_prefixed_error (error, first_error,
2081 _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: "));
2082 }
2083 else
2084 {
2085 /* ignore the error from the first try, if any */
2086 g_clear_error (&first_error);
2087 /* TODO: validate value */
2088 g_strstrip (ret);
2089 }
2090 return ret;
2091 #endif
2092 }
2093  
2094 /* ---------------------------------------------------------------------------------------------------- */
2095  
2096 gchar *
2097 _g_dbus_enum_to_string (GType enum_type, gint value)
2098 {
2099 gchar *ret;
2100 GEnumClass *klass;
2101 GEnumValue *enum_value;
2102  
2103 klass = g_type_class_ref (enum_type);
2104 enum_value = g_enum_get_value (klass, value);
2105 if (enum_value != NULL)
2106 ret = g_strdup (enum_value->value_nick);
2107 else
2108 ret = g_strdup_printf ("unknown (value %d)", value);
2109 g_type_class_unref (klass);
2110 return ret;
2111 }
2112  
2113 /* ---------------------------------------------------------------------------------------------------- */
2114  
2115 static void
2116 write_message_print_transport_debug (gssize bytes_written,
2117 MessageToWriteData *data)
2118 {
2119 if (G_LIKELY (!_g_dbus_debug_transport ()))
2120 goto out;
2121  
2122 _g_dbus_debug_print_lock ();
2123 g_print ("========================================================================\n"
2124 "GDBus-debug:Transport:\n"
2125 " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2126 " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n",
2127 bytes_written,
2128 g_dbus_message_get_serial (data->message),
2129 data->blob_size,
2130 data->total_written,
2131 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream))));
2132 _g_dbus_debug_print_unlock ();
2133 out:
2134 ;
2135 }
2136  
2137 /* ---------------------------------------------------------------------------------------------------- */
2138  
2139 static void
2140 read_message_print_transport_debug (gssize bytes_read,
2141 GDBusWorker *worker)
2142 {
2143 gsize size;
2144 gint32 serial;
2145 gint32 message_length;
2146  
2147 if (G_LIKELY (!_g_dbus_debug_transport ()))
2148 goto out;
2149  
2150 size = bytes_read + worker->read_buffer_cur_size;
2151 serial = 0;
2152 message_length = 0;
2153 if (size >= 16)
2154 message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL);
2155 if (size >= 1)
2156 {
2157 switch (worker->read_buffer[0])
2158 {
2159 case 'l':
2160 if (size >= 12)
2161 serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]);
2162 break;
2163 case 'B':
2164 if (size >= 12)
2165 serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]);
2166 break;
2167 default:
2168 /* an error will be set elsewhere if this happens */
2169 goto out;
2170 }
2171 }
2172  
2173 _g_dbus_debug_print_lock ();
2174 g_print ("========================================================================\n"
2175 "GDBus-debug:Transport:\n"
2176 " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n"
2177 " size %d to offset %" G_GSIZE_FORMAT " from a %s\n",
2178 bytes_read,
2179 serial,
2180 message_length,
2181 worker->read_buffer_cur_size,
2182 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))));
2183 _g_dbus_debug_print_unlock ();
2184 out:
2185 ;
2186 }
2187  
2188 /* ---------------------------------------------------------------------------------------------------- */
2189  
2190 gboolean
2191 _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint,
2192 GValue *return_accu,
2193 const GValue *handler_return,
2194 gpointer dummy)
2195 {
2196 gboolean continue_emission;
2197 gboolean signal_return;
2198  
2199 signal_return = g_value_get_boolean (handler_return);
2200 g_value_set_boolean (return_accu, signal_return);
2201 continue_emission = signal_return;
2202  
2203 return continue_emission;
2204 }