nexmon – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /* GDBus - GLib D-Bus Library |
2 | * |
||
3 | * Copyright (C) 2008-2010 Red Hat, Inc. |
||
4 | * |
||
5 | * This library is free software; you can redistribute it and/or |
||
6 | * modify it under the terms of the GNU Lesser General Public |
||
7 | * License as published by the Free Software Foundation; either |
||
8 | * version 2 of the License, or (at your option) any later version. |
||
9 | * |
||
10 | * This library is distributed in the hope that it will be useful, |
||
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
||
13 | * Lesser General Public License for more details. |
||
14 | * |
||
15 | * You should have received a copy of the GNU Lesser General |
||
16 | * Public License along with this library; if not, see <http://www.gnu.org/licenses/>. |
||
17 | * |
||
18 | * Author: David Zeuthen <davidz@redhat.com> |
||
19 | */ |
||
20 | |||
21 | #include "config.h" |
||
22 | |||
23 | #include <stdlib.h> |
||
24 | #include <string.h> |
||
25 | |||
26 | #include "giotypes.h" |
||
27 | #include "gsocket.h" |
||
28 | #include "gdbusprivate.h" |
||
29 | #include "gdbusmessage.h" |
||
30 | #include "gdbuserror.h" |
||
31 | #include "gdbusintrospection.h" |
||
32 | #include "gtask.h" |
||
33 | #include "ginputstream.h" |
||
34 | #include "gmemoryinputstream.h" |
||
35 | #include "giostream.h" |
||
36 | #include "glib/gstdio.h" |
||
37 | #include "gsocketcontrolmessage.h" |
||
38 | #include "gsocketconnection.h" |
||
39 | #include "gsocketoutputstream.h" |
||
40 | |||
41 | #ifdef G_OS_UNIX |
||
42 | #include "gunixfdmessage.h" |
||
43 | #include "gunixconnection.h" |
||
44 | #include "gunixcredentialsmessage.h" |
||
45 | #endif |
||
46 | |||
47 | #ifdef G_OS_WIN32 |
||
48 | #include <windows.h> |
||
49 | #endif |
||
50 | |||
51 | #include "glibintl.h" |
||
52 | |||
53 | static gboolean _g_dbus_worker_do_initial_read (gpointer data); |
||
54 | static void schedule_pending_close (GDBusWorker *worker); |
||
55 | |||
56 | /* ---------------------------------------------------------------------------------------------------- */ |
||
57 | |||
58 | gchar * |
||
59 | _g_dbus_hexdump (const gchar *data, gsize len, guint indent) |
||
60 | { |
||
61 | guint n, m; |
||
62 | GString *ret; |
||
63 | |||
64 | ret = g_string_new (NULL); |
||
65 | |||
66 | for (n = 0; n < len; n += 16) |
||
67 | { |
||
68 | g_string_append_printf (ret, "%*s%04x: ", indent, "", n); |
||
69 | |||
70 | for (m = n; m < n + 16; m++) |
||
71 | { |
||
72 | if (m > n && (m%4) == 0) |
||
73 | g_string_append_c (ret, ' '); |
||
74 | if (m < len) |
||
75 | g_string_append_printf (ret, "%02x ", (guchar) data[m]); |
||
76 | else |
||
77 | g_string_append (ret, " "); |
||
78 | } |
||
79 | |||
80 | g_string_append (ret, " "); |
||
81 | |||
82 | for (m = n; m < len && m < n + 16; m++) |
||
83 | g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.'); |
||
84 | |||
85 | g_string_append_c (ret, '\n'); |
||
86 | } |
||
87 | |||
88 | return g_string_free (ret, FALSE); |
||
89 | } |
||
90 | |||
91 | /* ---------------------------------------------------------------------------------------------------- */ |
||
92 | |||
93 | /* Unfortunately ancillary messages are discarded when reading from a |
||
94 | * socket using the GSocketInputStream abstraction. So we provide a |
||
95 | * very GInputStream-ish API that uses GSocket in this case (very |
||
96 | * similar to GSocketInputStream). |
||
97 | */ |
||
98 | |||
99 | typedef struct |
||
100 | { |
||
101 | void *buffer; |
||
102 | gsize count; |
||
103 | |||
104 | GSocketControlMessage ***messages; |
||
105 | gint *num_messages; |
||
106 | } ReadWithControlData; |
||
107 | |||
108 | static void |
||
109 | read_with_control_data_free (ReadWithControlData *data) |
||
110 | { |
||
111 | g_slice_free (ReadWithControlData, data); |
||
112 | } |
||
113 | |||
114 | static gboolean |
||
115 | _g_socket_read_with_control_messages_ready (GSocket *socket, |
||
116 | GIOCondition condition, |
||
117 | gpointer user_data) |
||
118 | { |
||
119 | GTask *task = user_data; |
||
120 | ReadWithControlData *data = g_task_get_task_data (task); |
||
121 | GError *error; |
||
122 | gssize result; |
||
123 | GInputVector vector; |
||
124 | |||
125 | error = NULL; |
||
126 | vector.buffer = data->buffer; |
||
127 | vector.size = data->count; |
||
128 | result = g_socket_receive_message (socket, |
||
129 | NULL, /* address */ |
||
130 | &vector, |
||
131 | 1, |
||
132 | data->messages, |
||
133 | data->num_messages, |
||
134 | NULL, |
||
135 | g_task_get_cancellable (task), |
||
136 | &error); |
||
137 | |||
138 | if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) |
||
139 | { |
||
140 | g_error_free (error); |
||
141 | return TRUE; |
||
142 | } |
||
143 | |||
144 | g_assert (result >= 0 || error != NULL); |
||
145 | if (result >= 0) |
||
146 | g_task_return_int (task, result); |
||
147 | else |
||
148 | g_task_return_error (task, error); |
||
149 | g_object_unref (task); |
||
150 | |||
151 | return FALSE; |
||
152 | } |
||
153 | |||
154 | static void |
||
155 | _g_socket_read_with_control_messages (GSocket *socket, |
||
156 | void *buffer, |
||
157 | gsize count, |
||
158 | GSocketControlMessage ***messages, |
||
159 | gint *num_messages, |
||
160 | gint io_priority, |
||
161 | GCancellable *cancellable, |
||
162 | GAsyncReadyCallback callback, |
||
163 | gpointer user_data) |
||
164 | { |
||
165 | GTask *task; |
||
166 | ReadWithControlData *data; |
||
167 | GSource *source; |
||
168 | |||
169 | data = g_slice_new0 (ReadWithControlData); |
||
170 | data->buffer = buffer; |
||
171 | data->count = count; |
||
172 | data->messages = messages; |
||
173 | data->num_messages = num_messages; |
||
174 | |||
175 | task = g_task_new (socket, cancellable, callback, user_data); |
||
176 | g_task_set_task_data (task, data, (GDestroyNotify) read_with_control_data_free); |
||
177 | |||
178 | if (g_socket_condition_check (socket, G_IO_IN)) |
||
179 | { |
||
180 | if (!_g_socket_read_with_control_messages_ready (socket, G_IO_IN, task)) |
||
181 | return; |
||
182 | } |
||
183 | |||
184 | source = g_socket_create_source (socket, |
||
185 | G_IO_IN | G_IO_HUP | G_IO_ERR, |
||
186 | cancellable); |
||
187 | g_task_attach_source (task, source, (GSourceFunc) _g_socket_read_with_control_messages_ready); |
||
188 | g_source_unref (source); |
||
189 | } |
||
190 | |||
191 | static gssize |
||
192 | _g_socket_read_with_control_messages_finish (GSocket *socket, |
||
193 | GAsyncResult *result, |
||
194 | GError **error) |
||
195 | { |
||
196 | g_return_val_if_fail (G_IS_SOCKET (socket), -1); |
||
197 | g_return_val_if_fail (g_task_is_valid (result, socket), -1); |
||
198 | |||
199 | return g_task_propagate_int (G_TASK (result), error); |
||
200 | } |
||
201 | |||
202 | /* ---------------------------------------------------------------------------------------------------- */ |
||
203 | |||
204 | /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ |
||
205 | |||
206 | static GPtrArray *ensured_classes = NULL; |
||
207 | |||
208 | static void |
||
209 | ensure_type (GType gtype) |
||
210 | { |
||
211 | g_ptr_array_add (ensured_classes, g_type_class_ref (gtype)); |
||
212 | } |
||
213 | |||
214 | static void |
||
215 | release_required_types (void) |
||
216 | { |
||
217 | g_ptr_array_foreach (ensured_classes, (GFunc) g_type_class_unref, NULL); |
||
218 | g_ptr_array_unref (ensured_classes); |
||
219 | ensured_classes = NULL; |
||
220 | } |
||
221 | |||
222 | static void |
||
223 | ensure_required_types (void) |
||
224 | { |
||
225 | g_assert (ensured_classes == NULL); |
||
226 | ensured_classes = g_ptr_array_new (); |
||
227 | ensure_type (G_TYPE_TASK); |
||
228 | ensure_type (G_TYPE_MEMORY_INPUT_STREAM); |
||
229 | } |
||
230 | /* ---------------------------------------------------------------------------------------------------- */ |
||
231 | |||
232 | typedef struct |
||
233 | { |
||
234 | volatile gint refcount; |
||
235 | GThread *thread; |
||
236 | GMainContext *context; |
||
237 | GMainLoop *loop; |
||
238 | } SharedThreadData; |
||
239 | |||
240 | static gpointer |
||
241 | gdbus_shared_thread_func (gpointer user_data) |
||
242 | { |
||
243 | SharedThreadData *data = user_data; |
||
244 | |||
245 | g_main_context_push_thread_default (data->context); |
||
246 | g_main_loop_run (data->loop); |
||
247 | g_main_context_pop_thread_default (data->context); |
||
248 | |||
249 | release_required_types (); |
||
250 | |||
251 | return NULL; |
||
252 | } |
||
253 | |||
254 | /* ---------------------------------------------------------------------------------------------------- */ |
||
255 | |||
256 | static SharedThreadData * |
||
257 | _g_dbus_shared_thread_ref (void) |
||
258 | { |
||
259 | static gsize shared_thread_data = 0; |
||
260 | SharedThreadData *ret; |
||
261 | |||
262 | if (g_once_init_enter (&shared_thread_data)) |
||
263 | { |
||
264 | SharedThreadData *data; |
||
265 | |||
266 | /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */ |
||
267 | ensure_required_types (); |
||
268 | |||
269 | data = g_new0 (SharedThreadData, 1); |
||
270 | data->refcount = 0; |
||
271 | |||
272 | data->context = g_main_context_new (); |
||
273 | data->loop = g_main_loop_new (data->context, FALSE); |
||
274 | data->thread = g_thread_new ("gdbus", |
||
275 | gdbus_shared_thread_func, |
||
276 | data); |
||
277 | /* We can cast between gsize and gpointer safely */ |
||
278 | g_once_init_leave (&shared_thread_data, (gsize) data); |
||
279 | } |
||
280 | |||
281 | ret = (SharedThreadData*) shared_thread_data; |
||
282 | g_atomic_int_inc (&ret->refcount); |
||
283 | return ret; |
||
284 | } |
||
285 | |||
286 | static void |
||
287 | _g_dbus_shared_thread_unref (SharedThreadData *data) |
||
288 | { |
||
289 | /* TODO: actually destroy the shared thread here */ |
||
290 | #if 0 |
||
291 | g_assert (data != NULL); |
||
292 | if (g_atomic_int_dec_and_test (&data->refcount)) |
||
293 | { |
||
294 | g_main_loop_quit (data->loop); |
||
295 | //g_thread_join (data->thread); |
||
296 | g_main_loop_unref (data->loop); |
||
297 | g_main_context_unref (data->context); |
||
298 | } |
||
299 | #endif |
||
300 | } |
||
301 | |||
302 | /* ---------------------------------------------------------------------------------------------------- */ |
||
303 | |||
304 | typedef enum { |
||
305 | PENDING_NONE = 0, |
||
306 | PENDING_WRITE, |
||
307 | PENDING_FLUSH, |
||
308 | PENDING_CLOSE |
||
309 | } OutputPending; |
||
310 | |||
311 | struct GDBusWorker |
||
312 | { |
||
313 | volatile gint ref_count; |
||
314 | |||
315 | SharedThreadData *shared_thread_data; |
||
316 | |||
317 | /* really a boolean, but GLib 2.28 lacks atomic boolean ops */ |
||
318 | volatile gint stopped; |
||
319 | |||
320 | /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently |
||
321 | * only affects messages received from the other peer (since GDBusServer is the |
||
322 | * only user) - we might want it to affect messages sent to the other peer too? |
||
323 | */ |
||
324 | gboolean frozen; |
||
325 | GDBusCapabilityFlags capabilities; |
||
326 | GQueue *received_messages_while_frozen; |
||
327 | |||
328 | GIOStream *stream; |
||
329 | GCancellable *cancellable; |
||
330 | GDBusWorkerMessageReceivedCallback message_received_callback; |
||
331 | GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback; |
||
332 | GDBusWorkerDisconnectedCallback disconnected_callback; |
||
333 | gpointer user_data; |
||
334 | |||
335 | /* if not NULL, stream is GSocketConnection */ |
||
336 | GSocket *socket; |
||
337 | |||
338 | /* used for reading */ |
||
339 | GMutex read_lock; |
||
340 | gchar *read_buffer; |
||
341 | gsize read_buffer_allocated_size; |
||
342 | gsize read_buffer_cur_size; |
||
343 | gsize read_buffer_bytes_wanted; |
||
344 | GUnixFDList *read_fd_list; |
||
345 | GSocketControlMessage **read_ancillary_messages; |
||
346 | gint read_num_ancillary_messages; |
||
347 | |||
348 | /* Whether an async write, flush or close, or none of those, is pending. |
||
349 | * Only the worker thread may change its value, and only with the write_lock. |
||
350 | * Other threads may read its value when holding the write_lock. |
||
351 | * The worker thread may read its value at any time. |
||
352 | */ |
||
353 | OutputPending output_pending; |
||
354 | /* used for writing */ |
||
355 | GMutex write_lock; |
||
356 | /* queue of MessageToWriteData, protected by write_lock */ |
||
357 | GQueue *write_queue; |
||
358 | /* protected by write_lock */ |
||
359 | guint64 write_num_messages_written; |
||
360 | /* number of messages we'd written out last time we flushed; |
||
361 | * protected by write_lock |
||
362 | */ |
||
363 | guint64 write_num_messages_flushed; |
||
364 | /* list of FlushData, protected by write_lock */ |
||
365 | GList *write_pending_flushes; |
||
366 | /* list of CloseData, protected by write_lock */ |
||
367 | GList *pending_close_attempts; |
||
368 | /* no lock - only used from the worker thread */ |
||
369 | gboolean close_expected; |
||
370 | }; |
||
371 | |||
372 | static void _g_dbus_worker_unref (GDBusWorker *worker); |
||
373 | |||
374 | /* ---------------------------------------------------------------------------------------------------- */ |
||
375 | |||
376 | typedef struct |
||
377 | { |
||
378 | GMutex mutex; |
||
379 | GCond cond; |
||
380 | guint64 number_to_wait_for; |
||
381 | GError *error; |
||
382 | } FlushData; |
||
383 | |||
384 | struct _MessageToWriteData ; |
||
385 | typedef struct _MessageToWriteData MessageToWriteData; |
||
386 | |||
387 | static void message_to_write_data_free (MessageToWriteData *data); |
||
388 | |||
389 | static void read_message_print_transport_debug (gssize bytes_read, |
||
390 | GDBusWorker *worker); |
||
391 | |||
392 | static void write_message_print_transport_debug (gssize bytes_written, |
||
393 | MessageToWriteData *data); |
||
394 | |||
395 | typedef struct { |
||
396 | GDBusWorker *worker; |
||
397 | GTask *task; |
||
398 | } CloseData; |
||
399 | |||
400 | static void close_data_free (CloseData *close_data) |
||
401 | { |
||
402 | g_clear_object (&close_data->task); |
||
403 | |||
404 | _g_dbus_worker_unref (close_data->worker); |
||
405 | g_slice_free (CloseData, close_data); |
||
406 | } |
||
407 | |||
408 | /* ---------------------------------------------------------------------------------------------------- */ |
||
409 | |||
410 | static GDBusWorker * |
||
411 | _g_dbus_worker_ref (GDBusWorker *worker) |
||
412 | { |
||
413 | g_atomic_int_inc (&worker->ref_count); |
||
414 | return worker; |
||
415 | } |
||
416 | |||
417 | static void |
||
418 | _g_dbus_worker_unref (GDBusWorker *worker) |
||
419 | { |
||
420 | if (g_atomic_int_dec_and_test (&worker->ref_count)) |
||
421 | { |
||
422 | g_assert (worker->write_pending_flushes == NULL); |
||
423 | |||
424 | _g_dbus_shared_thread_unref (worker->shared_thread_data); |
||
425 | |||
426 | g_object_unref (worker->stream); |
||
427 | |||
428 | g_mutex_clear (&worker->read_lock); |
||
429 | g_object_unref (worker->cancellable); |
||
430 | if (worker->read_fd_list != NULL) |
||
431 | g_object_unref (worker->read_fd_list); |
||
432 | |||
433 | g_queue_free_full (worker->received_messages_while_frozen, (GDestroyNotify) g_object_unref); |
||
434 | g_mutex_clear (&worker->write_lock); |
||
435 | g_queue_free_full (worker->write_queue, (GDestroyNotify) message_to_write_data_free); |
||
436 | g_free (worker->read_buffer); |
||
437 | |||
438 | g_free (worker); |
||
439 | } |
||
440 | } |
||
441 | |||
442 | static void |
||
443 | _g_dbus_worker_emit_disconnected (GDBusWorker *worker, |
||
444 | gboolean remote_peer_vanished, |
||
445 | GError *error) |
||
446 | { |
||
447 | if (!g_atomic_int_get (&worker->stopped)) |
||
448 | worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data); |
||
449 | } |
||
450 | |||
451 | static void |
||
452 | _g_dbus_worker_emit_message_received (GDBusWorker *worker, |
||
453 | GDBusMessage *message) |
||
454 | { |
||
455 | if (!g_atomic_int_get (&worker->stopped)) |
||
456 | worker->message_received_callback (worker, message, worker->user_data); |
||
457 | } |
||
458 | |||
459 | static GDBusMessage * |
||
460 | _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, |
||
461 | GDBusMessage *message) |
||
462 | { |
||
463 | GDBusMessage *ret; |
||
464 | if (!g_atomic_int_get (&worker->stopped)) |
||
465 | ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data); |
||
466 | else |
||
467 | ret = message; |
||
468 | return ret; |
||
469 | } |
||
470 | |||
471 | /* can only be called from private thread with read-lock held - takes ownership of @message */ |
||
472 | static void |
||
473 | _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker, |
||
474 | GDBusMessage *message) |
||
475 | { |
||
476 | if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0) |
||
477 | { |
||
478 | /* queue up */ |
||
479 | g_queue_push_tail (worker->received_messages_while_frozen, message); |
||
480 | } |
||
481 | else |
||
482 | { |
||
483 | /* not frozen, nor anything in queue */ |
||
484 | _g_dbus_worker_emit_message_received (worker, message); |
||
485 | g_object_unref (message); |
||
486 | } |
||
487 | } |
||
488 | |||
489 | /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ |
||
490 | static gboolean |
||
491 | unfreeze_in_idle_cb (gpointer user_data) |
||
492 | { |
||
493 | GDBusWorker *worker = user_data; |
||
494 | GDBusMessage *message; |
||
495 | |||
496 | g_mutex_lock (&worker->read_lock); |
||
497 | if (worker->frozen) |
||
498 | { |
||
499 | while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL) |
||
500 | { |
||
501 | _g_dbus_worker_emit_message_received (worker, message); |
||
502 | g_object_unref (message); |
||
503 | } |
||
504 | worker->frozen = FALSE; |
||
505 | } |
||
506 | else |
||
507 | { |
||
508 | g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0); |
||
509 | } |
||
510 | g_mutex_unlock (&worker->read_lock); |
||
511 | return FALSE; |
||
512 | } |
||
513 | |||
514 | /* can be called from any thread */ |
||
515 | void |
||
516 | _g_dbus_worker_unfreeze (GDBusWorker *worker) |
||
517 | { |
||
518 | GSource *idle_source; |
||
519 | idle_source = g_idle_source_new (); |
||
520 | g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); |
||
521 | g_source_set_callback (idle_source, |
||
522 | unfreeze_in_idle_cb, |
||
523 | _g_dbus_worker_ref (worker), |
||
524 | (GDestroyNotify) _g_dbus_worker_unref); |
||
525 | g_source_set_name (idle_source, "[gio] unfreeze_in_idle_cb"); |
||
526 | g_source_attach (idle_source, worker->shared_thread_data->context); |
||
527 | g_source_unref (idle_source); |
||
528 | } |
||
529 | |||
530 | /* ---------------------------------------------------------------------------------------------------- */ |
||
531 | |||
532 | static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); |
||
533 | |||
534 | /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ |
||
535 | static void |
||
536 | _g_dbus_worker_do_read_cb (GInputStream *input_stream, |
||
537 | GAsyncResult *res, |
||
538 | gpointer user_data) |
||
539 | { |
||
540 | GDBusWorker *worker = user_data; |
||
541 | GError *error; |
||
542 | gssize bytes_read; |
||
543 | |||
544 | g_mutex_lock (&worker->read_lock); |
||
545 | |||
546 | /* If already stopped, don't even process the reply */ |
||
547 | if (g_atomic_int_get (&worker->stopped)) |
||
548 | goto out; |
||
549 | |||
550 | error = NULL; |
||
551 | if (worker->socket == NULL) |
||
552 | bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), |
||
553 | res, |
||
554 | &error); |
||
555 | else |
||
556 | bytes_read = _g_socket_read_with_control_messages_finish (worker->socket, |
||
557 | res, |
||
558 | &error); |
||
559 | if (worker->read_num_ancillary_messages > 0) |
||
560 | { |
||
561 | gint n; |
||
562 | for (n = 0; n < worker->read_num_ancillary_messages; n++) |
||
563 | { |
||
564 | GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]); |
||
565 | |||
566 | if (FALSE) |
||
567 | { |
||
568 | } |
||
569 | #ifdef G_OS_UNIX |
||
570 | else if (G_IS_UNIX_FD_MESSAGE (control_message)) |
||
571 | { |
||
572 | GUnixFDMessage *fd_message; |
||
573 | gint *fds; |
||
574 | gint num_fds; |
||
575 | |||
576 | fd_message = G_UNIX_FD_MESSAGE (control_message); |
||
577 | fds = g_unix_fd_message_steal_fds (fd_message, &num_fds); |
||
578 | if (worker->read_fd_list == NULL) |
||
579 | { |
||
580 | worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds); |
||
581 | } |
||
582 | else |
||
583 | { |
||
584 | gint n; |
||
585 | for (n = 0; n < num_fds; n++) |
||
586 | { |
||
587 | /* TODO: really want a append_steal() */ |
||
588 | g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL); |
||
589 | (void) g_close (fds[n], NULL); |
||
590 | } |
||
591 | } |
||
592 | g_free (fds); |
||
593 | } |
||
594 | else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message)) |
||
595 | { |
||
596 | /* do nothing */ |
||
597 | } |
||
598 | #endif |
||
599 | else |
||
600 | { |
||
601 | if (error == NULL) |
||
602 | { |
||
603 | g_set_error (&error, |
||
604 | G_IO_ERROR, |
||
605 | G_IO_ERROR_FAILED, |
||
606 | "Unexpected ancillary message of type %s received from peer", |
||
607 | g_type_name (G_TYPE_FROM_INSTANCE (control_message))); |
||
608 | _g_dbus_worker_emit_disconnected (worker, TRUE, error); |
||
609 | g_error_free (error); |
||
610 | g_object_unref (control_message); |
||
611 | n++; |
||
612 | while (n < worker->read_num_ancillary_messages) |
||
613 | g_object_unref (worker->read_ancillary_messages[n++]); |
||
614 | g_free (worker->read_ancillary_messages); |
||
615 | goto out; |
||
616 | } |
||
617 | } |
||
618 | g_object_unref (control_message); |
||
619 | } |
||
620 | g_free (worker->read_ancillary_messages); |
||
621 | } |
||
622 | |||
623 | if (bytes_read == -1) |
||
624 | { |
||
625 | if (G_UNLIKELY (_g_dbus_debug_transport ())) |
||
626 | { |
||
627 | _g_dbus_debug_print_lock (); |
||
628 | g_print ("========================================================================\n" |
||
629 | "GDBus-debug:Transport:\n" |
||
630 | " ---- READ ERROR on stream of type %s:\n" |
||
631 | " ---- %s %d: %s\n", |
||
632 | g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream))), |
||
633 | g_quark_to_string (error->domain), error->code, |
||
634 | error->message); |
||
635 | _g_dbus_debug_print_unlock (); |
||
636 | } |
||
637 | |||
638 | /* Every async read that uses this callback uses worker->cancellable |
||
639 | * as its GCancellable. worker->cancellable gets cancelled if and only |
||
640 | * if the GDBusConnection tells us to close (either via |
||
641 | * _g_dbus_worker_stop, which is called on last-unref, or directly), |
||
642 | * so a cancelled read must mean our connection was closed locally. |
||
643 | * |
||
644 | * If we're closing, other errors are possible - notably, |
||
645 | * G_IO_ERROR_CLOSED can be seen if we close the stream with an async |
||
646 | * read in-flight. It seems sensible to treat all read errors during |
||
647 | * closing as an expected thing that doesn't trip exit-on-close. |
||
648 | * |
||
649 | * Because close_expected can't be set until we get into the worker |
||
650 | * thread, but the cancellable is signalled sooner (from another |
||
651 | * thread), we do still need to check the error. |
||
652 | */ |
||
653 | if (worker->close_expected || |
||
654 | g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) |
||
655 | _g_dbus_worker_emit_disconnected (worker, FALSE, NULL); |
||
656 | else |
||
657 | _g_dbus_worker_emit_disconnected (worker, TRUE, error); |
||
658 | |||
659 | g_error_free (error); |
||
660 | goto out; |
||
661 | } |
||
662 | |||
663 | #if 0 |
||
664 | g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p", |
||
665 | (gint) bytes_read, |
||
666 | g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))), |
||
667 | g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))), |
||
668 | g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)), |
||
669 | G_IO_IN | G_IO_OUT | G_IO_HUP), |
||
670 | worker->stream, |
||
671 | worker); |
||
672 | #endif |
||
673 | |||
674 | /* TODO: hmm, hmm... */ |
||
675 | if (bytes_read == 0) |
||
676 | { |
||
677 | g_set_error (&error, |
||
678 | G_IO_ERROR, |
||
679 | G_IO_ERROR_FAILED, |
||
680 | "Underlying GIOStream returned 0 bytes on an async read"); |
||
681 | _g_dbus_worker_emit_disconnected (worker, TRUE, error); |
||
682 | g_error_free (error); |
||
683 | goto out; |
||
684 | } |
||
685 | |||
686 | read_message_print_transport_debug (bytes_read, worker); |
||
687 | |||
688 | worker->read_buffer_cur_size += bytes_read; |
||
689 | if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) |
||
690 | { |
||
691 | /* OK, got what we asked for! */ |
||
692 | if (worker->read_buffer_bytes_wanted == 16) |
||
693 | { |
||
694 | gssize message_len; |
||
695 | /* OK, got the header - determine how many more bytes are needed */ |
||
696 | error = NULL; |
||
697 | message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, |
||
698 | 16, |
||
699 | &error); |
||
700 | if (message_len == -1) |
||
701 | { |
||
702 | g_warning ("_g_dbus_worker_do_read_cb: error determining bytes needed: %s", error->message); |
||
703 | _g_dbus_worker_emit_disconnected (worker, FALSE, error); |
||
704 | g_error_free (error); |
||
705 | goto out; |
||
706 | } |
||
707 | |||
708 | worker->read_buffer_bytes_wanted = message_len; |
||
709 | _g_dbus_worker_do_read_unlocked (worker); |
||
710 | } |
||
711 | else |
||
712 | { |
||
713 | GDBusMessage *message; |
||
714 | error = NULL; |
||
715 | |||
716 | /* TODO: use connection->priv->auth to decode the message */ |
||
717 | |||
718 | message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, |
||
719 | worker->read_buffer_cur_size, |
||
720 | worker->capabilities, |
||
721 | &error); |
||
722 | if (message == NULL) |
||
723 | { |
||
724 | gchar *s; |
||
725 | s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); |
||
726 | g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT " bytes\n" |
||
727 | "The error is: %s\n" |
||
728 | "The payload is as follows:\n" |
||
729 | "%s\n", |
||
730 | worker->read_buffer_cur_size, |
||
731 | error->message, |
||
732 | s); |
||
733 | g_free (s); |
||
734 | _g_dbus_worker_emit_disconnected (worker, FALSE, error); |
||
735 | g_error_free (error); |
||
736 | goto out; |
||
737 | } |
||
738 | |||
739 | #ifdef G_OS_UNIX |
||
740 | if (worker->read_fd_list != NULL) |
||
741 | { |
||
742 | g_dbus_message_set_unix_fd_list (message, worker->read_fd_list); |
||
743 | g_object_unref (worker->read_fd_list); |
||
744 | worker->read_fd_list = NULL; |
||
745 | } |
||
746 | #endif |
||
747 | |||
748 | if (G_UNLIKELY (_g_dbus_debug_message ())) |
||
749 | { |
||
750 | gchar *s; |
||
751 | _g_dbus_debug_print_lock (); |
||
752 | g_print ("========================================================================\n" |
||
753 | "GDBus-debug:Message:\n" |
||
754 | " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", |
||
755 | worker->read_buffer_cur_size); |
||
756 | s = g_dbus_message_print (message, 2); |
||
757 | g_print ("%s", s); |
||
758 | g_free (s); |
||
759 | if (G_UNLIKELY (_g_dbus_debug_payload ())) |
||
760 | { |
||
761 | s = _g_dbus_hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); |
||
762 | g_print ("%s\n", s); |
||
763 | g_free (s); |
||
764 | } |
||
765 | _g_dbus_debug_print_unlock (); |
||
766 | } |
||
767 | |||
768 | /* yay, got a message, go deliver it */ |
||
769 | _g_dbus_worker_queue_or_deliver_received_message (worker, message); |
||
770 | |||
771 | /* start reading another message! */ |
||
772 | worker->read_buffer_bytes_wanted = 0; |
||
773 | worker->read_buffer_cur_size = 0; |
||
774 | _g_dbus_worker_do_read_unlocked (worker); |
||
775 | } |
||
776 | } |
||
777 | else |
||
778 | { |
||
779 | /* didn't get all the bytes we requested - so repeat the request... */ |
||
780 | _g_dbus_worker_do_read_unlocked (worker); |
||
781 | } |
||
782 | |||
783 | out: |
||
784 | g_mutex_unlock (&worker->read_lock); |
||
785 | |||
786 | /* gives up the reference acquired when calling g_input_stream_read_async() */ |
||
787 | _g_dbus_worker_unref (worker); |
||
788 | |||
789 | /* check if there is any pending close */ |
||
790 | schedule_pending_close (worker); |
||
791 | } |
||
792 | |||
793 | /* called in private thread shared by all GDBusConnection instances (with read-lock held) */ |
||
794 | static void |
||
795 | _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) |
||
796 | { |
||
797 | /* Note that we do need to keep trying to read even if close_expected is |
||
798 | * true, because only failing a read causes us to signal 'closed'. |
||
799 | */ |
||
800 | |||
801 | /* if bytes_wanted is zero, it means start reading a message */ |
||
802 | if (worker->read_buffer_bytes_wanted == 0) |
||
803 | { |
||
804 | worker->read_buffer_cur_size = 0; |
||
805 | worker->read_buffer_bytes_wanted = 16; |
||
806 | } |
||
807 | |||
808 | /* ensure we have a (big enough) buffer */ |
||
809 | if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size) |
||
810 | { |
||
811 | /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */ |
||
812 | worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096); |
||
813 | worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); |
||
814 | } |
||
815 | |||
816 | if (worker->socket == NULL) |
||
817 | g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream), |
||
818 | worker->read_buffer + worker->read_buffer_cur_size, |
||
819 | worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size, |
||
820 | G_PRIORITY_DEFAULT, |
||
821 | worker->cancellable, |
||
822 | (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, |
||
823 | _g_dbus_worker_ref (worker)); |
||
824 | else |
||
825 | { |
||
826 | worker->read_ancillary_messages = NULL; |
||
827 | worker->read_num_ancillary_messages = 0; |
||
828 | _g_socket_read_with_control_messages (worker->socket, |
||
829 | worker->read_buffer + worker->read_buffer_cur_size, |
||
830 | worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size, |
||
831 | &worker->read_ancillary_messages, |
||
832 | &worker->read_num_ancillary_messages, |
||
833 | G_PRIORITY_DEFAULT, |
||
834 | worker->cancellable, |
||
835 | (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, |
||
836 | _g_dbus_worker_ref (worker)); |
||
837 | } |
||
838 | } |
||
839 | |||
840 | /* called in private thread shared by all GDBusConnection instances (without read-lock held) */ |
||
841 | static gboolean |
||
842 | _g_dbus_worker_do_initial_read (gpointer data) |
||
843 | { |
||
844 | GDBusWorker *worker = data; |
||
845 | g_mutex_lock (&worker->read_lock); |
||
846 | _g_dbus_worker_do_read_unlocked (worker); |
||
847 | g_mutex_unlock (&worker->read_lock); |
||
848 | return FALSE; |
||
849 | } |
||
850 | |||
851 | /* ---------------------------------------------------------------------------------------------------- */ |
||
852 | |||
853 | struct _MessageToWriteData |
||
854 | { |
||
855 | GDBusWorker *worker; |
||
856 | GDBusMessage *message; |
||
857 | gchar *blob; |
||
858 | gsize blob_size; |
||
859 | |||
860 | gsize total_written; |
||
861 | GTask *task; |
||
862 | }; |
||
863 | |||
864 | static void |
||
865 | message_to_write_data_free (MessageToWriteData *data) |
||
866 | { |
||
867 | _g_dbus_worker_unref (data->worker); |
||
868 | if (data->message) |
||
869 | g_object_unref (data->message); |
||
870 | g_free (data->blob); |
||
871 | g_slice_free (MessageToWriteData, data); |
||
872 | } |
||
873 | |||
874 | /* ---------------------------------------------------------------------------------------------------- */ |
||
875 | |||
876 | static void write_message_continue_writing (MessageToWriteData *data); |
||
877 | |||
878 | /* called in private thread shared by all GDBusConnection instances |
||
879 | * |
||
880 | * write-lock is not held on entry |
||
881 | * output_pending is PENDING_WRITE on entry |
||
882 | */ |
||
883 | static void |
||
884 | write_message_async_cb (GObject *source_object, |
||
885 | GAsyncResult *res, |
||
886 | gpointer user_data) |
||
887 | { |
||
888 | MessageToWriteData *data = user_data; |
||
889 | GTask *task; |
||
890 | gssize bytes_written; |
||
891 | GError *error; |
||
892 | |||
893 | /* Note: we can't access data->task after calling g_task_return_* () because the |
||
894 | * callback can free @data and we're not completing in idle. So use a copy of the pointer. |
||
895 | */ |
||
896 | task = data->task; |
||
897 | |||
898 | error = NULL; |
||
899 | bytes_written = g_output_stream_write_finish (G_OUTPUT_STREAM (source_object), |
||
900 | res, |
||
901 | &error); |
||
902 | if (bytes_written == -1) |
||
903 | { |
||
904 | g_task_return_error (task, error); |
||
905 | g_object_unref (task); |
||
906 | goto out; |
||
907 | } |
||
908 | g_assert (bytes_written > 0); /* zero is never returned */ |
||
909 | |||
910 | write_message_print_transport_debug (bytes_written, data); |
||
911 | |||
912 | data->total_written += bytes_written; |
||
913 | g_assert (data->total_written <= data->blob_size); |
||
914 | if (data->total_written == data->blob_size) |
||
915 | { |
||
916 | g_task_return_boolean (task, TRUE); |
||
917 | g_object_unref (task); |
||
918 | goto out; |
||
919 | } |
||
920 | |||
921 | write_message_continue_writing (data); |
||
922 | |||
923 | out: |
||
924 | ; |
||
925 | } |
||
926 | |||
927 | /* called in private thread shared by all GDBusConnection instances |
||
928 | * |
||
929 | * write-lock is not held on entry |
||
930 | * output_pending is PENDING_WRITE on entry |
||
931 | */ |
||
932 | #ifdef G_OS_UNIX |
||
933 | static gboolean |
||
934 | on_socket_ready (GSocket *socket, |
||
935 | GIOCondition condition, |
||
936 | gpointer user_data) |
||
937 | { |
||
938 | MessageToWriteData *data = user_data; |
||
939 | write_message_continue_writing (data); |
||
940 | return FALSE; /* remove source */ |
||
941 | } |
||
942 | #endif |
||
943 | |||
944 | /* called in private thread shared by all GDBusConnection instances |
||
945 | * |
||
946 | * write-lock is not held on entry |
||
947 | * output_pending is PENDING_WRITE on entry |
||
948 | */ |
||
949 | static void |
||
950 | write_message_continue_writing (MessageToWriteData *data) |
||
951 | { |
||
952 | GOutputStream *ostream; |
||
953 | #ifdef G_OS_UNIX |
||
954 | GTask *task; |
||
955 | GUnixFDList *fd_list; |
||
956 | #endif |
||
957 | |||
958 | #ifdef G_OS_UNIX |
||
959 | /* Note: we can't access data->task after calling g_task_return_* () because the |
||
960 | * callback can free @data and we're not completing in idle. So use a copy of the pointer. |
||
961 | */ |
||
962 | task = data->task; |
||
963 | #endif |
||
964 | |||
965 | ostream = g_io_stream_get_output_stream (data->worker->stream); |
||
966 | #ifdef G_OS_UNIX |
||
967 | fd_list = g_dbus_message_get_unix_fd_list (data->message); |
||
968 | #endif |
||
969 | |||
970 | g_assert (!g_output_stream_has_pending (ostream)); |
||
971 | g_assert_cmpint (data->total_written, <, data->blob_size); |
||
972 | |||
973 | if (FALSE) |
||
974 | { |
||
975 | } |
||
976 | #ifdef G_OS_UNIX |
||
977 | else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0) |
||
978 | { |
||
979 | GOutputVector vector; |
||
980 | GSocketControlMessage *control_message; |
||
981 | gssize bytes_written; |
||
982 | GError *error; |
||
983 | |||
984 | vector.buffer = data->blob; |
||
985 | vector.size = data->blob_size; |
||
986 | |||
987 | control_message = NULL; |
||
988 | if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0) |
||
989 | { |
||
990 | if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) |
||
991 | { |
||
992 | g_task_return_new_error (task, |
||
993 | G_IO_ERROR, |
||
994 | G_IO_ERROR_FAILED, |
||
995 | "Tried sending a file descriptor but remote peer does not support this capability"); |
||
996 | g_object_unref (task); |
||
997 | goto out; |
||
998 | } |
||
999 | control_message = g_unix_fd_message_new_with_fd_list (fd_list); |
||
1000 | } |
||
1001 | |||
1002 | error = NULL; |
||
1003 | bytes_written = g_socket_send_message (data->worker->socket, |
||
1004 | NULL, /* address */ |
||
1005 | &vector, |
||
1006 | 1, |
||
1007 | control_message != NULL ? &control_message : NULL, |
||
1008 | control_message != NULL ? 1 : 0, |
||
1009 | G_SOCKET_MSG_NONE, |
||
1010 | data->worker->cancellable, |
||
1011 | &error); |
||
1012 | if (control_message != NULL) |
||
1013 | g_object_unref (control_message); |
||
1014 | |||
1015 | if (bytes_written == -1) |
||
1016 | { |
||
1017 | /* Handle WOULD_BLOCK by waiting until there's room in the buffer */ |
||
1018 | if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) |
||
1019 | { |
||
1020 | GSource *source; |
||
1021 | source = g_socket_create_source (data->worker->socket, |
||
1022 | G_IO_OUT | G_IO_HUP | G_IO_ERR, |
||
1023 | data->worker->cancellable); |
||
1024 | g_source_set_callback (source, |
||
1025 | (GSourceFunc) on_socket_ready, |
||
1026 | data, |
||
1027 | NULL); /* GDestroyNotify */ |
||
1028 | g_source_attach (source, g_main_context_get_thread_default ()); |
||
1029 | g_source_unref (source); |
||
1030 | g_error_free (error); |
||
1031 | goto out; |
||
1032 | } |
||
1033 | g_task_return_error (task, error); |
||
1034 | g_object_unref (task); |
||
1035 | goto out; |
||
1036 | } |
||
1037 | g_assert (bytes_written > 0); /* zero is never returned */ |
||
1038 | |||
1039 | write_message_print_transport_debug (bytes_written, data); |
||
1040 | |||
1041 | data->total_written += bytes_written; |
||
1042 | g_assert (data->total_written <= data->blob_size); |
||
1043 | if (data->total_written == data->blob_size) |
||
1044 | { |
||
1045 | g_task_return_boolean (task, TRUE); |
||
1046 | g_object_unref (task); |
||
1047 | goto out; |
||
1048 | } |
||
1049 | |||
1050 | write_message_continue_writing (data); |
||
1051 | } |
||
1052 | #endif |
||
1053 | else |
||
1054 | { |
||
1055 | #ifdef G_OS_UNIX |
||
1056 | if (fd_list != NULL) |
||
1057 | { |
||
1058 | g_task_return_new_error (task, |
||
1059 | G_IO_ERROR, |
||
1060 | G_IO_ERROR_FAILED, |
||
1061 | "Tried sending a file descriptor on unsupported stream of type %s", |
||
1062 | g_type_name (G_TYPE_FROM_INSTANCE (ostream))); |
||
1063 | g_object_unref (task); |
||
1064 | goto out; |
||
1065 | } |
||
1066 | #endif |
||
1067 | |||
1068 | g_output_stream_write_async (ostream, |
||
1069 | (const gchar *) data->blob + data->total_written, |
||
1070 | data->blob_size - data->total_written, |
||
1071 | G_PRIORITY_DEFAULT, |
||
1072 | data->worker->cancellable, |
||
1073 | write_message_async_cb, |
||
1074 | data); |
||
1075 | } |
||
1076 | #ifdef G_OS_UNIX |
||
1077 | out: |
||
1078 | #endif |
||
1079 | ; |
||
1080 | } |
||
1081 | |||
1082 | /* called in private thread shared by all GDBusConnection instances |
||
1083 | * |
||
1084 | * write-lock is not held on entry |
||
1085 | * output_pending is PENDING_WRITE on entry |
||
1086 | */ |
||
1087 | static void |
||
1088 | write_message_async (GDBusWorker *worker, |
||
1089 | MessageToWriteData *data, |
||
1090 | GAsyncReadyCallback callback, |
||
1091 | gpointer user_data) |
||
1092 | { |
||
1093 | data->task = g_task_new (NULL, NULL, callback, user_data); |
||
1094 | data->total_written = 0; |
||
1095 | write_message_continue_writing (data); |
||
1096 | } |
||
1097 | |||
1098 | /* called in private thread shared by all GDBusConnection instances (with write-lock held) */ |
||
1099 | static gboolean |
||
1100 | write_message_finish (GAsyncResult *res, |
||
1101 | GError **error) |
||
1102 | { |
||
1103 | g_return_val_if_fail (g_task_is_valid (res, NULL), FALSE); |
||
1104 | |||
1105 | return g_task_propagate_boolean (G_TASK (res), error); |
||
1106 | } |
||
1107 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1108 | |||
1109 | static void continue_writing (GDBusWorker *worker); |
||
1110 | |||
1111 | typedef struct |
||
1112 | { |
||
1113 | GDBusWorker *worker; |
||
1114 | GList *flushers; |
||
1115 | } FlushAsyncData; |
||
1116 | |||
1117 | static void |
||
1118 | flush_data_list_complete (const GList *flushers, |
||
1119 | const GError *error) |
||
1120 | { |
||
1121 | const GList *l; |
||
1122 | |||
1123 | for (l = flushers; l != NULL; l = l->next) |
||
1124 | { |
||
1125 | FlushData *f = l->data; |
||
1126 | |||
1127 | f->error = error != NULL ? g_error_copy (error) : NULL; |
||
1128 | |||
1129 | g_mutex_lock (&f->mutex); |
||
1130 | g_cond_signal (&f->cond); |
||
1131 | g_mutex_unlock (&f->mutex); |
||
1132 | } |
||
1133 | } |
||
1134 | |||
1135 | /* called in private thread shared by all GDBusConnection instances |
||
1136 | * |
||
1137 | * write-lock is not held on entry |
||
1138 | * output_pending is PENDING_FLUSH on entry |
||
1139 | */ |
||
1140 | static void |
||
1141 | ostream_flush_cb (GObject *source_object, |
||
1142 | GAsyncResult *res, |
||
1143 | gpointer user_data) |
||
1144 | { |
||
1145 | FlushAsyncData *data = user_data; |
||
1146 | GError *error; |
||
1147 | |||
1148 | error = NULL; |
||
1149 | g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object), |
||
1150 | res, |
||
1151 | &error); |
||
1152 | |||
1153 | if (error == NULL) |
||
1154 | { |
||
1155 | if (G_UNLIKELY (_g_dbus_debug_transport ())) |
||
1156 | { |
||
1157 | _g_dbus_debug_print_lock (); |
||
1158 | g_print ("========================================================================\n" |
||
1159 | "GDBus-debug:Transport:\n" |
||
1160 | " ---- FLUSHED stream of type %s\n", |
||
1161 | g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); |
||
1162 | _g_dbus_debug_print_unlock (); |
||
1163 | } |
||
1164 | } |
||
1165 | |||
1166 | g_assert (data->flushers != NULL); |
||
1167 | flush_data_list_complete (data->flushers, error); |
||
1168 | g_list_free (data->flushers); |
||
1169 | |||
1170 | if (error != NULL) |
||
1171 | g_error_free (error); |
||
1172 | |||
1173 | /* Make sure we tell folks that we don't have additional |
||
1174 | flushes pending */ |
||
1175 | g_mutex_lock (&data->worker->write_lock); |
||
1176 | data->worker->write_num_messages_flushed = data->worker->write_num_messages_written; |
||
1177 | g_assert (data->worker->output_pending == PENDING_FLUSH); |
||
1178 | data->worker->output_pending = PENDING_NONE; |
||
1179 | g_mutex_unlock (&data->worker->write_lock); |
||
1180 | |||
1181 | /* OK, cool, finally kick off the next write */ |
||
1182 | continue_writing (data->worker); |
||
1183 | |||
1184 | _g_dbus_worker_unref (data->worker); |
||
1185 | g_free (data); |
||
1186 | } |
||
1187 | |||
1188 | /* called in private thread shared by all GDBusConnection instances |
||
1189 | * |
||
1190 | * write-lock is not held on entry |
||
1191 | * output_pending is PENDING_FLUSH on entry |
||
1192 | */ |
||
1193 | static void |
||
1194 | start_flush (FlushAsyncData *data) |
||
1195 | { |
||
1196 | g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream), |
||
1197 | G_PRIORITY_DEFAULT, |
||
1198 | data->worker->cancellable, |
||
1199 | ostream_flush_cb, |
||
1200 | data); |
||
1201 | } |
||
1202 | |||
1203 | /* called in private thread shared by all GDBusConnection instances |
||
1204 | * |
||
1205 | * write-lock is held on entry |
||
1206 | * output_pending is PENDING_NONE on entry |
||
1207 | */ |
||
1208 | static void |
||
1209 | message_written_unlocked (GDBusWorker *worker, |
||
1210 | MessageToWriteData *message_data) |
||
1211 | { |
||
1212 | if (G_UNLIKELY (_g_dbus_debug_message ())) |
||
1213 | { |
||
1214 | gchar *s; |
||
1215 | _g_dbus_debug_print_lock (); |
||
1216 | g_print ("========================================================================\n" |
||
1217 | "GDBus-debug:Message:\n" |
||
1218 | " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", |
||
1219 | message_data->blob_size); |
||
1220 | s = g_dbus_message_print (message_data->message, 2); |
||
1221 | g_print ("%s", s); |
||
1222 | g_free (s); |
||
1223 | if (G_UNLIKELY (_g_dbus_debug_payload ())) |
||
1224 | { |
||
1225 | s = _g_dbus_hexdump (message_data->blob, message_data->blob_size, 2); |
||
1226 | g_print ("%s\n", s); |
||
1227 | g_free (s); |
||
1228 | } |
||
1229 | _g_dbus_debug_print_unlock (); |
||
1230 | } |
||
1231 | |||
1232 | worker->write_num_messages_written += 1; |
||
1233 | } |
||
1234 | |||
1235 | /* called in private thread shared by all GDBusConnection instances |
||
1236 | * |
||
1237 | * write-lock is held on entry |
||
1238 | * output_pending is PENDING_NONE on entry |
||
1239 | * |
||
1240 | * Returns: non-%NULL, setting @output_pending, if we need to flush now |
||
1241 | */ |
||
1242 | static FlushAsyncData * |
||
1243 | prepare_flush_unlocked (GDBusWorker *worker) |
||
1244 | { |
||
1245 | GList *l; |
||
1246 | GList *ll; |
||
1247 | GList *flushers; |
||
1248 | |||
1249 | flushers = NULL; |
||
1250 | for (l = worker->write_pending_flushes; l != NULL; l = ll) |
||
1251 | { |
||
1252 | FlushData *f = l->data; |
||
1253 | ll = l->next; |
||
1254 | |||
1255 | if (f->number_to_wait_for == worker->write_num_messages_written) |
||
1256 | { |
||
1257 | flushers = g_list_append (flushers, f); |
||
1258 | worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l); |
||
1259 | } |
||
1260 | } |
||
1261 | if (flushers != NULL) |
||
1262 | { |
||
1263 | g_assert (worker->output_pending == PENDING_NONE); |
||
1264 | worker->output_pending = PENDING_FLUSH; |
||
1265 | } |
||
1266 | |||
1267 | if (flushers != NULL) |
||
1268 | { |
||
1269 | FlushAsyncData *data; |
||
1270 | |||
1271 | data = g_new0 (FlushAsyncData, 1); |
||
1272 | data->worker = _g_dbus_worker_ref (worker); |
||
1273 | data->flushers = flushers; |
||
1274 | return data; |
||
1275 | } |
||
1276 | |||
1277 | return NULL; |
||
1278 | } |
||
1279 | |||
1280 | /* called in private thread shared by all GDBusConnection instances |
||
1281 | * |
||
1282 | * write-lock is not held on entry |
||
1283 | * output_pending is PENDING_WRITE on entry |
||
1284 | */ |
||
1285 | static void |
||
1286 | write_message_cb (GObject *source_object, |
||
1287 | GAsyncResult *res, |
||
1288 | gpointer user_data) |
||
1289 | { |
||
1290 | MessageToWriteData *data = user_data; |
||
1291 | GError *error; |
||
1292 | |||
1293 | g_mutex_lock (&data->worker->write_lock); |
||
1294 | g_assert (data->worker->output_pending == PENDING_WRITE); |
||
1295 | data->worker->output_pending = PENDING_NONE; |
||
1296 | |||
1297 | error = NULL; |
||
1298 | if (!write_message_finish (res, &error)) |
||
1299 | { |
||
1300 | g_mutex_unlock (&data->worker->write_lock); |
||
1301 | |||
1302 | /* TODO: handle */ |
||
1303 | _g_dbus_worker_emit_disconnected (data->worker, TRUE, error); |
||
1304 | g_error_free (error); |
||
1305 | |||
1306 | g_mutex_lock (&data->worker->write_lock); |
||
1307 | } |
||
1308 | |||
1309 | message_written_unlocked (data->worker, data); |
||
1310 | |||
1311 | g_mutex_unlock (&data->worker->write_lock); |
||
1312 | |||
1313 | continue_writing (data->worker); |
||
1314 | |||
1315 | message_to_write_data_free (data); |
||
1316 | } |
||
1317 | |||
1318 | /* called in private thread shared by all GDBusConnection instances |
||
1319 | * |
||
1320 | * write-lock is not held on entry |
||
1321 | * output_pending is PENDING_CLOSE on entry |
||
1322 | */ |
||
1323 | static void |
||
1324 | iostream_close_cb (GObject *source_object, |
||
1325 | GAsyncResult *res, |
||
1326 | gpointer user_data) |
||
1327 | { |
||
1328 | GDBusWorker *worker = user_data; |
||
1329 | GError *error = NULL; |
||
1330 | GList *pending_close_attempts, *pending_flush_attempts; |
||
1331 | GQueue *send_queue; |
||
1332 | |||
1333 | g_io_stream_close_finish (worker->stream, res, &error); |
||
1334 | |||
1335 | g_mutex_lock (&worker->write_lock); |
||
1336 | |||
1337 | pending_close_attempts = worker->pending_close_attempts; |
||
1338 | worker->pending_close_attempts = NULL; |
||
1339 | |||
1340 | pending_flush_attempts = worker->write_pending_flushes; |
||
1341 | worker->write_pending_flushes = NULL; |
||
1342 | |||
1343 | send_queue = worker->write_queue; |
||
1344 | worker->write_queue = g_queue_new (); |
||
1345 | |||
1346 | g_assert (worker->output_pending == PENDING_CLOSE); |
||
1347 | worker->output_pending = PENDING_NONE; |
||
1348 | |||
1349 | g_mutex_unlock (&worker->write_lock); |
||
1350 | |||
1351 | while (pending_close_attempts != NULL) |
||
1352 | { |
||
1353 | CloseData *close_data = pending_close_attempts->data; |
||
1354 | |||
1355 | pending_close_attempts = g_list_delete_link (pending_close_attempts, |
||
1356 | pending_close_attempts); |
||
1357 | |||
1358 | if (close_data->task != NULL) |
||
1359 | { |
||
1360 | if (error != NULL) |
||
1361 | g_task_return_error (close_data->task, g_error_copy (error)); |
||
1362 | else |
||
1363 | g_task_return_boolean (close_data->task, TRUE); |
||
1364 | } |
||
1365 | |||
1366 | close_data_free (close_data); |
||
1367 | } |
||
1368 | |||
1369 | g_clear_error (&error); |
||
1370 | |||
1371 | /* all messages queued for sending are discarded */ |
||
1372 | g_queue_free_full (send_queue, (GDestroyNotify) message_to_write_data_free); |
||
1373 | /* all queued flushes fail */ |
||
1374 | error = g_error_new (G_IO_ERROR, G_IO_ERROR_CANCELLED, |
||
1375 | _("Operation was cancelled")); |
||
1376 | flush_data_list_complete (pending_flush_attempts, error); |
||
1377 | g_list_free (pending_flush_attempts); |
||
1378 | g_clear_error (&error); |
||
1379 | |||
1380 | _g_dbus_worker_unref (worker); |
||
1381 | } |
||
1382 | |||
1383 | /* called in private thread shared by all GDBusConnection instances |
||
1384 | * |
||
1385 | * write-lock is not held on entry |
||
1386 | * output_pending must be PENDING_NONE on entry |
||
1387 | */ |
||
1388 | static void |
||
1389 | continue_writing (GDBusWorker *worker) |
||
1390 | { |
||
1391 | MessageToWriteData *data; |
||
1392 | FlushAsyncData *flush_async_data; |
||
1393 | |||
1394 | write_next: |
||
1395 | /* we mustn't try to write two things at once */ |
||
1396 | g_assert (worker->output_pending == PENDING_NONE); |
||
1397 | |||
1398 | g_mutex_lock (&worker->write_lock); |
||
1399 | |||
1400 | data = NULL; |
||
1401 | flush_async_data = NULL; |
||
1402 | |||
1403 | /* if we want to close the connection, that takes precedence */ |
||
1404 | if (worker->pending_close_attempts != NULL) |
||
1405 | { |
||
1406 | GInputStream *input = g_io_stream_get_input_stream (worker->stream); |
||
1407 | |||
1408 | if (!g_input_stream_has_pending (input)) |
||
1409 | { |
||
1410 | worker->close_expected = TRUE; |
||
1411 | worker->output_pending = PENDING_CLOSE; |
||
1412 | |||
1413 | g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, |
||
1414 | NULL, iostream_close_cb, |
||
1415 | _g_dbus_worker_ref (worker)); |
||
1416 | } |
||
1417 | } |
||
1418 | else |
||
1419 | { |
||
1420 | flush_async_data = prepare_flush_unlocked (worker); |
||
1421 | |||
1422 | if (flush_async_data == NULL) |
||
1423 | { |
||
1424 | data = g_queue_pop_head (worker->write_queue); |
||
1425 | |||
1426 | if (data != NULL) |
||
1427 | worker->output_pending = PENDING_WRITE; |
||
1428 | } |
||
1429 | } |
||
1430 | |||
1431 | g_mutex_unlock (&worker->write_lock); |
||
1432 | |||
1433 | /* Note that write_lock is only used for protecting the @write_queue |
||
1434 | * and @output_pending fields of the GDBusWorker struct ... which we |
||
1435 | * need to modify from arbitrary threads in _g_dbus_worker_send_message(). |
||
1436 | * |
||
1437 | * Therefore, it's fine to drop it here when calling back into user |
||
1438 | * code and then writing the message out onto the GIOStream since this |
||
1439 | * function only runs on the worker thread. |
||
1440 | */ |
||
1441 | |||
1442 | if (flush_async_data != NULL) |
||
1443 | { |
||
1444 | start_flush (flush_async_data); |
||
1445 | g_assert (data == NULL); |
||
1446 | } |
||
1447 | else if (data != NULL) |
||
1448 | { |
||
1449 | GDBusMessage *old_message; |
||
1450 | guchar *new_blob; |
||
1451 | gsize new_blob_size; |
||
1452 | GError *error; |
||
1453 | |||
1454 | old_message = data->message; |
||
1455 | data->message = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message); |
||
1456 | if (data->message == old_message) |
||
1457 | { |
||
1458 | /* filters had no effect - do nothing */ |
||
1459 | } |
||
1460 | else if (data->message == NULL) |
||
1461 | { |
||
1462 | /* filters dropped message */ |
||
1463 | g_mutex_lock (&worker->write_lock); |
||
1464 | worker->output_pending = PENDING_NONE; |
||
1465 | g_mutex_unlock (&worker->write_lock); |
||
1466 | message_to_write_data_free (data); |
||
1467 | goto write_next; |
||
1468 | } |
||
1469 | else |
||
1470 | { |
||
1471 | /* filters altered the message -> reencode */ |
||
1472 | error = NULL; |
||
1473 | new_blob = g_dbus_message_to_blob (data->message, |
||
1474 | &new_blob_size, |
||
1475 | worker->capabilities, |
||
1476 | &error); |
||
1477 | if (new_blob == NULL) |
||
1478 | { |
||
1479 | /* if filter make the GDBusMessage unencodeable, just complain on stderr and send |
||
1480 | * the old message instead |
||
1481 | */ |
||
1482 | g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s", |
||
1483 | g_dbus_message_get_serial (data->message), |
||
1484 | error->message); |
||
1485 | g_error_free (error); |
||
1486 | } |
||
1487 | else |
||
1488 | { |
||
1489 | g_free (data->blob); |
||
1490 | data->blob = (gchar *) new_blob; |
||
1491 | data->blob_size = new_blob_size; |
||
1492 | } |
||
1493 | } |
||
1494 | |||
1495 | write_message_async (worker, |
||
1496 | data, |
||
1497 | write_message_cb, |
||
1498 | data); |
||
1499 | } |
||
1500 | } |
||
1501 | |||
1502 | /* called in private thread shared by all GDBusConnection instances |
||
1503 | * |
||
1504 | * write-lock is not held on entry |
||
1505 | * output_pending may be anything |
||
1506 | */ |
||
1507 | static gboolean |
||
1508 | continue_writing_in_idle_cb (gpointer user_data) |
||
1509 | { |
||
1510 | GDBusWorker *worker = user_data; |
||
1511 | |||
1512 | /* Because this is the worker thread, we can read this struct member |
||
1513 | * without holding the lock: no other thread ever modifies it. |
||
1514 | */ |
||
1515 | if (worker->output_pending == PENDING_NONE) |
||
1516 | continue_writing (worker); |
||
1517 | |||
1518 | return FALSE; |
||
1519 | } |
||
1520 | |||
1521 | /* |
||
1522 | * @write_data: (transfer full) (allow-none): |
||
1523 | * @flush_data: (transfer full) (allow-none): |
||
1524 | * @close_data: (transfer full) (allow-none): |
||
1525 | * |
||
1526 | * Can be called from any thread |
||
1527 | * |
||
1528 | * write_lock is held on entry |
||
1529 | * output_pending may be anything |
||
1530 | */ |
||
1531 | static void |
||
1532 | schedule_writing_unlocked (GDBusWorker *worker, |
||
1533 | MessageToWriteData *write_data, |
||
1534 | FlushData *flush_data, |
||
1535 | CloseData *close_data) |
||
1536 | { |
||
1537 | if (write_data != NULL) |
||
1538 | g_queue_push_tail (worker->write_queue, write_data); |
||
1539 | |||
1540 | if (flush_data != NULL) |
||
1541 | worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, flush_data); |
||
1542 | |||
1543 | if (close_data != NULL) |
||
1544 | worker->pending_close_attempts = g_list_prepend (worker->pending_close_attempts, |
||
1545 | close_data); |
||
1546 | |||
1547 | /* If we had output pending, the next bit of output will happen |
||
1548 | * automatically when it finishes, so we only need to do this |
||
1549 | * if nothing was pending. |
||
1550 | * |
||
1551 | * The idle callback will re-check that output_pending is still |
||
1552 | * PENDING_NONE, to guard against output starting before the idle. |
||
1553 | */ |
||
1554 | if (worker->output_pending == PENDING_NONE) |
||
1555 | { |
||
1556 | GSource *idle_source; |
||
1557 | idle_source = g_idle_source_new (); |
||
1558 | g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); |
||
1559 | g_source_set_callback (idle_source, |
||
1560 | continue_writing_in_idle_cb, |
||
1561 | _g_dbus_worker_ref (worker), |
||
1562 | (GDestroyNotify) _g_dbus_worker_unref); |
||
1563 | g_source_set_name (idle_source, "[gio] continue_writing_in_idle_cb"); |
||
1564 | g_source_attach (idle_source, worker->shared_thread_data->context); |
||
1565 | g_source_unref (idle_source); |
||
1566 | } |
||
1567 | } |
||
1568 | |||
1569 | static void |
||
1570 | schedule_pending_close (GDBusWorker *worker) |
||
1571 | { |
||
1572 | if (!worker->pending_close_attempts) |
||
1573 | return; |
||
1574 | |||
1575 | schedule_writing_unlocked (worker, NULL, NULL, NULL); |
||
1576 | } |
||
1577 | |||
1578 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1579 | |||
1580 | /* can be called from any thread - steals blob |
||
1581 | * |
||
1582 | * write_lock is not held on entry |
||
1583 | * output_pending may be anything |
||
1584 | */ |
||
1585 | void |
||
1586 | _g_dbus_worker_send_message (GDBusWorker *worker, |
||
1587 | GDBusMessage *message, |
||
1588 | gchar *blob, |
||
1589 | gsize blob_len) |
||
1590 | { |
||
1591 | MessageToWriteData *data; |
||
1592 | |||
1593 | g_return_if_fail (G_IS_DBUS_MESSAGE (message)); |
||
1594 | g_return_if_fail (blob != NULL); |
||
1595 | g_return_if_fail (blob_len > 16); |
||
1596 | |||
1597 | data = g_slice_new0 (MessageToWriteData); |
||
1598 | data->worker = _g_dbus_worker_ref (worker); |
||
1599 | data->message = g_object_ref (message); |
||
1600 | data->blob = blob; /* steal! */ |
||
1601 | data->blob_size = blob_len; |
||
1602 | |||
1603 | g_mutex_lock (&worker->write_lock); |
||
1604 | schedule_writing_unlocked (worker, data, NULL, NULL); |
||
1605 | g_mutex_unlock (&worker->write_lock); |
||
1606 | } |
||
1607 | |||
1608 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1609 | |||
1610 | GDBusWorker * |
||
1611 | _g_dbus_worker_new (GIOStream *stream, |
||
1612 | GDBusCapabilityFlags capabilities, |
||
1613 | gboolean initially_frozen, |
||
1614 | GDBusWorkerMessageReceivedCallback message_received_callback, |
||
1615 | GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback, |
||
1616 | GDBusWorkerDisconnectedCallback disconnected_callback, |
||
1617 | gpointer user_data) |
||
1618 | { |
||
1619 | GDBusWorker *worker; |
||
1620 | GSource *idle_source; |
||
1621 | |||
1622 | g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL); |
||
1623 | g_return_val_if_fail (message_received_callback != NULL, NULL); |
||
1624 | g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL); |
||
1625 | g_return_val_if_fail (disconnected_callback != NULL, NULL); |
||
1626 | |||
1627 | worker = g_new0 (GDBusWorker, 1); |
||
1628 | worker->ref_count = 1; |
||
1629 | |||
1630 | g_mutex_init (&worker->read_lock); |
||
1631 | worker->message_received_callback = message_received_callback; |
||
1632 | worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback; |
||
1633 | worker->disconnected_callback = disconnected_callback; |
||
1634 | worker->user_data = user_data; |
||
1635 | worker->stream = g_object_ref (stream); |
||
1636 | worker->capabilities = capabilities; |
||
1637 | worker->cancellable = g_cancellable_new (); |
||
1638 | worker->output_pending = PENDING_NONE; |
||
1639 | |||
1640 | worker->frozen = initially_frozen; |
||
1641 | worker->received_messages_while_frozen = g_queue_new (); |
||
1642 | |||
1643 | g_mutex_init (&worker->write_lock); |
||
1644 | worker->write_queue = g_queue_new (); |
||
1645 | |||
1646 | if (G_IS_SOCKET_CONNECTION (worker->stream)) |
||
1647 | worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); |
||
1648 | |||
1649 | worker->shared_thread_data = _g_dbus_shared_thread_ref (); |
||
1650 | |||
1651 | /* begin reading */ |
||
1652 | idle_source = g_idle_source_new (); |
||
1653 | g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); |
||
1654 | g_source_set_callback (idle_source, |
||
1655 | _g_dbus_worker_do_initial_read, |
||
1656 | _g_dbus_worker_ref (worker), |
||
1657 | (GDestroyNotify) _g_dbus_worker_unref); |
||
1658 | g_source_set_name (idle_source, "[gio] _g_dbus_worker_do_initial_read"); |
||
1659 | g_source_attach (idle_source, worker->shared_thread_data->context); |
||
1660 | g_source_unref (idle_source); |
||
1661 | |||
1662 | return worker; |
||
1663 | } |
||
1664 | |||
1665 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1666 | |||
1667 | /* can be called from any thread |
||
1668 | * |
||
1669 | * write_lock is not held on entry |
||
1670 | * output_pending may be anything |
||
1671 | */ |
||
1672 | void |
||
1673 | _g_dbus_worker_close (GDBusWorker *worker, |
||
1674 | GTask *task) |
||
1675 | { |
||
1676 | CloseData *close_data; |
||
1677 | |||
1678 | close_data = g_slice_new0 (CloseData); |
||
1679 | close_data->worker = _g_dbus_worker_ref (worker); |
||
1680 | close_data->task = (task == NULL ? NULL : g_object_ref (task)); |
||
1681 | |||
1682 | /* Don't set worker->close_expected here - we're in the wrong thread. |
||
1683 | * It'll be set before the actual close happens. |
||
1684 | */ |
||
1685 | g_cancellable_cancel (worker->cancellable); |
||
1686 | g_mutex_lock (&worker->write_lock); |
||
1687 | schedule_writing_unlocked (worker, NULL, NULL, close_data); |
||
1688 | g_mutex_unlock (&worker->write_lock); |
||
1689 | } |
||
1690 | |||
1691 | /* This can be called from any thread - frees worker. Note that |
||
1692 | * callbacks might still happen if called from another thread than the |
||
1693 | * worker - use your own synchronization primitive in the callbacks. |
||
1694 | * |
||
1695 | * write_lock is not held on entry |
||
1696 | * output_pending may be anything |
||
1697 | */ |
||
1698 | void |
||
1699 | _g_dbus_worker_stop (GDBusWorker *worker) |
||
1700 | { |
||
1701 | g_atomic_int_set (&worker->stopped, TRUE); |
||
1702 | |||
1703 | /* Cancel any pending operations and schedule a close of the underlying I/O |
||
1704 | * stream in the worker thread |
||
1705 | */ |
||
1706 | _g_dbus_worker_close (worker, NULL); |
||
1707 | |||
1708 | /* _g_dbus_worker_close holds a ref until after an idle in the worker |
||
1709 | * thread has run, so we no longer need to unref in an idle like in |
||
1710 | * commit 322e25b535 |
||
1711 | */ |
||
1712 | _g_dbus_worker_unref (worker); |
||
1713 | } |
||
1714 | |||
1715 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1716 | |||
1717 | /* can be called from any thread (except the worker thread) - blocks |
||
1718 | * calling thread until all queued outgoing messages are written and |
||
1719 | * the transport has been flushed |
||
1720 | * |
||
1721 | * write_lock is not held on entry |
||
1722 | * output_pending may be anything |
||
1723 | */ |
||
1724 | gboolean |
||
1725 | _g_dbus_worker_flush_sync (GDBusWorker *worker, |
||
1726 | GCancellable *cancellable, |
||
1727 | GError **error) |
||
1728 | { |
||
1729 | gboolean ret; |
||
1730 | FlushData *data; |
||
1731 | guint64 pending_writes; |
||
1732 | |||
1733 | data = NULL; |
||
1734 | ret = TRUE; |
||
1735 | |||
1736 | g_mutex_lock (&worker->write_lock); |
||
1737 | |||
1738 | /* if the queue is empty, no write is in-flight and we haven't written |
||
1739 | * anything since the last flush, then there's nothing to wait for |
||
1740 | */ |
||
1741 | pending_writes = g_queue_get_length (worker->write_queue); |
||
1742 | |||
1743 | /* if a write is in-flight, we shouldn't be satisfied until the first |
||
1744 | * flush operation that follows it |
||
1745 | */ |
||
1746 | if (worker->output_pending == PENDING_WRITE) |
||
1747 | pending_writes += 1; |
||
1748 | |||
1749 | if (pending_writes > 0 || |
||
1750 | worker->write_num_messages_written != worker->write_num_messages_flushed) |
||
1751 | { |
||
1752 | data = g_new0 (FlushData, 1); |
||
1753 | g_mutex_init (&data->mutex); |
||
1754 | g_cond_init (&data->cond); |
||
1755 | data->number_to_wait_for = worker->write_num_messages_written + pending_writes; |
||
1756 | g_mutex_lock (&data->mutex); |
||
1757 | |||
1758 | schedule_writing_unlocked (worker, NULL, data, NULL); |
||
1759 | } |
||
1760 | g_mutex_unlock (&worker->write_lock); |
||
1761 | |||
1762 | if (data != NULL) |
||
1763 | { |
||
1764 | g_cond_wait (&data->cond, &data->mutex); |
||
1765 | g_mutex_unlock (&data->mutex); |
||
1766 | |||
1767 | /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */ |
||
1768 | g_cond_clear (&data->cond); |
||
1769 | g_mutex_clear (&data->mutex); |
||
1770 | if (data->error != NULL) |
||
1771 | { |
||
1772 | ret = FALSE; |
||
1773 | g_propagate_error (error, data->error); |
||
1774 | } |
||
1775 | g_free (data); |
||
1776 | } |
||
1777 | |||
1778 | return ret; |
||
1779 | } |
||
1780 | |||
1781 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1782 | |||
1783 | #define G_DBUS_DEBUG_AUTHENTICATION (1<<0) |
||
1784 | #define G_DBUS_DEBUG_TRANSPORT (1<<1) |
||
1785 | #define G_DBUS_DEBUG_MESSAGE (1<<2) |
||
1786 | #define G_DBUS_DEBUG_PAYLOAD (1<<3) |
||
1787 | #define G_DBUS_DEBUG_CALL (1<<4) |
||
1788 | #define G_DBUS_DEBUG_SIGNAL (1<<5) |
||
1789 | #define G_DBUS_DEBUG_INCOMING (1<<6) |
||
1790 | #define G_DBUS_DEBUG_RETURN (1<<7) |
||
1791 | #define G_DBUS_DEBUG_EMISSION (1<<8) |
||
1792 | #define G_DBUS_DEBUG_ADDRESS (1<<9) |
||
1793 | |||
1794 | static gint _gdbus_debug_flags = 0; |
||
1795 | |||
1796 | gboolean |
||
1797 | _g_dbus_debug_authentication (void) |
||
1798 | { |
||
1799 | _g_dbus_initialize (); |
||
1800 | return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0; |
||
1801 | } |
||
1802 | |||
1803 | gboolean |
||
1804 | _g_dbus_debug_transport (void) |
||
1805 | { |
||
1806 | _g_dbus_initialize (); |
||
1807 | return (_gdbus_debug_flags & G_DBUS_DEBUG_TRANSPORT) != 0; |
||
1808 | } |
||
1809 | |||
1810 | gboolean |
||
1811 | _g_dbus_debug_message (void) |
||
1812 | { |
||
1813 | _g_dbus_initialize (); |
||
1814 | return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0; |
||
1815 | } |
||
1816 | |||
1817 | gboolean |
||
1818 | _g_dbus_debug_payload (void) |
||
1819 | { |
||
1820 | _g_dbus_initialize (); |
||
1821 | return (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) != 0; |
||
1822 | } |
||
1823 | |||
1824 | gboolean |
||
1825 | _g_dbus_debug_call (void) |
||
1826 | { |
||
1827 | _g_dbus_initialize (); |
||
1828 | return (_gdbus_debug_flags & G_DBUS_DEBUG_CALL) != 0; |
||
1829 | } |
||
1830 | |||
1831 | gboolean |
||
1832 | _g_dbus_debug_signal (void) |
||
1833 | { |
||
1834 | _g_dbus_initialize (); |
||
1835 | return (_gdbus_debug_flags & G_DBUS_DEBUG_SIGNAL) != 0; |
||
1836 | } |
||
1837 | |||
1838 | gboolean |
||
1839 | _g_dbus_debug_incoming (void) |
||
1840 | { |
||
1841 | _g_dbus_initialize (); |
||
1842 | return (_gdbus_debug_flags & G_DBUS_DEBUG_INCOMING) != 0; |
||
1843 | } |
||
1844 | |||
1845 | gboolean |
||
1846 | _g_dbus_debug_return (void) |
||
1847 | { |
||
1848 | _g_dbus_initialize (); |
||
1849 | return (_gdbus_debug_flags & G_DBUS_DEBUG_RETURN) != 0; |
||
1850 | } |
||
1851 | |||
1852 | gboolean |
||
1853 | _g_dbus_debug_emission (void) |
||
1854 | { |
||
1855 | _g_dbus_initialize (); |
||
1856 | return (_gdbus_debug_flags & G_DBUS_DEBUG_EMISSION) != 0; |
||
1857 | } |
||
1858 | |||
1859 | gboolean |
||
1860 | _g_dbus_debug_address (void) |
||
1861 | { |
||
1862 | _g_dbus_initialize (); |
||
1863 | return (_gdbus_debug_flags & G_DBUS_DEBUG_ADDRESS) != 0; |
||
1864 | } |
||
1865 | |||
1866 | G_LOCK_DEFINE_STATIC (print_lock); |
||
1867 | |||
1868 | void |
||
1869 | _g_dbus_debug_print_lock (void) |
||
1870 | { |
||
1871 | G_LOCK (print_lock); |
||
1872 | } |
||
1873 | |||
1874 | void |
||
1875 | _g_dbus_debug_print_unlock (void) |
||
1876 | { |
||
1877 | G_UNLOCK (print_lock); |
||
1878 | } |
||
1879 | |||
1880 | /** |
||
1881 | * _g_dbus_initialize: |
||
1882 | * |
||
1883 | * Does various one-time init things such as |
||
1884 | * |
||
1885 | * - registering the G_DBUS_ERROR error domain |
||
1886 | * - parses the G_DBUS_DEBUG environment variable |
||
1887 | */ |
||
1888 | void |
||
1889 | _g_dbus_initialize (void) |
||
1890 | { |
||
1891 | static volatile gsize initialized = 0; |
||
1892 | |||
1893 | if (g_once_init_enter (&initialized)) |
||
1894 | { |
||
1895 | volatile GQuark g_dbus_error_domain; |
||
1896 | const gchar *debug; |
||
1897 | |||
1898 | g_dbus_error_domain = G_DBUS_ERROR; |
||
1899 | (g_dbus_error_domain); /* To avoid -Wunused-but-set-variable */ |
||
1900 | |||
1901 | debug = g_getenv ("G_DBUS_DEBUG"); |
||
1902 | if (debug != NULL) |
||
1903 | { |
||
1904 | const GDebugKey keys[] = { |
||
1905 | { "authentication", G_DBUS_DEBUG_AUTHENTICATION }, |
||
1906 | { "transport", G_DBUS_DEBUG_TRANSPORT }, |
||
1907 | { "message", G_DBUS_DEBUG_MESSAGE }, |
||
1908 | { "payload", G_DBUS_DEBUG_PAYLOAD }, |
||
1909 | { "call", G_DBUS_DEBUG_CALL }, |
||
1910 | { "signal", G_DBUS_DEBUG_SIGNAL }, |
||
1911 | { "incoming", G_DBUS_DEBUG_INCOMING }, |
||
1912 | { "return", G_DBUS_DEBUG_RETURN }, |
||
1913 | { "emission", G_DBUS_DEBUG_EMISSION }, |
||
1914 | { "address", G_DBUS_DEBUG_ADDRESS } |
||
1915 | }; |
||
1916 | |||
1917 | _gdbus_debug_flags = g_parse_debug_string (debug, keys, G_N_ELEMENTS (keys)); |
||
1918 | if (_gdbus_debug_flags & G_DBUS_DEBUG_PAYLOAD) |
||
1919 | _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE; |
||
1920 | } |
||
1921 | |||
1922 | g_once_init_leave (&initialized, 1); |
||
1923 | } |
||
1924 | } |
||
1925 | |||
1926 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1927 | |||
1928 | GVariantType * |
||
1929 | _g_dbus_compute_complete_signature (GDBusArgInfo **args) |
||
1930 | { |
||
1931 | const GVariantType *arg_types[256]; |
||
1932 | guint n; |
||
1933 | |||
1934 | if (args) |
||
1935 | for (n = 0; args[n] != NULL; n++) |
||
1936 | { |
||
1937 | /* DBus places a hard limit of 255 on signature length. |
||
1938 | * therefore number of args must be less than 256. |
||
1939 | */ |
||
1940 | g_assert (n < 256); |
||
1941 | |||
1942 | arg_types[n] = G_VARIANT_TYPE (args[n]->signature); |
||
1943 | |||
1944 | if G_UNLIKELY (arg_types[n] == NULL) |
||
1945 | return NULL; |
||
1946 | } |
||
1947 | else |
||
1948 | n = 0; |
||
1949 | |||
1950 | return g_variant_type_new_tuple (arg_types, n); |
||
1951 | } |
||
1952 | |||
1953 | /* ---------------------------------------------------------------------------------------------------- */ |
||
1954 | |||
1955 | #ifdef G_OS_WIN32 |
||
1956 | |||
1957 | extern BOOL WINAPI ConvertSidToStringSidA (PSID Sid, LPSTR *StringSid); |
||
1958 | |||
1959 | gchar * |
||
1960 | _g_dbus_win32_get_user_sid (void) |
||
1961 | { |
||
1962 | HANDLE h; |
||
1963 | TOKEN_USER *user; |
||
1964 | DWORD token_information_len; |
||
1965 | PSID psid; |
||
1966 | gchar *sid; |
||
1967 | gchar *ret; |
||
1968 | |||
1969 | ret = NULL; |
||
1970 | user = NULL; |
||
1971 | h = INVALID_HANDLE_VALUE; |
||
1972 | |||
1973 | if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY, &h)) |
||
1974 | { |
||
1975 | g_warning ("OpenProcessToken failed with error code %d", (gint) GetLastError ()); |
||
1976 | goto out; |
||
1977 | } |
||
1978 | |||
1979 | /* Get length of buffer */ |
||
1980 | token_information_len = 0; |
||
1981 | if (!GetTokenInformation (h, TokenUser, NULL, 0, &token_information_len)) |
||
1982 | { |
||
1983 | if (GetLastError () != ERROR_INSUFFICIENT_BUFFER) |
||
1984 | { |
||
1985 | g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ()); |
||
1986 | goto out; |
||
1987 | } |
||
1988 | } |
||
1989 | user = g_malloc (token_information_len); |
||
1990 | if (!GetTokenInformation (h, TokenUser, user, token_information_len, &token_information_len)) |
||
1991 | { |
||
1992 | g_warning ("GetTokenInformation() failed with error code %d", (gint) GetLastError ()); |
||
1993 | goto out; |
||
1994 | } |
||
1995 | |||
1996 | psid = user->User.Sid; |
||
1997 | if (!IsValidSid (psid)) |
||
1998 | { |
||
1999 | g_warning ("Invalid SID"); |
||
2000 | goto out; |
||
2001 | } |
||
2002 | |||
2003 | if (!ConvertSidToStringSidA (psid, &sid)) |
||
2004 | { |
||
2005 | g_warning ("Invalid SID"); |
||
2006 | goto out; |
||
2007 | } |
||
2008 | |||
2009 | ret = g_strdup (sid); |
||
2010 | LocalFree (sid); |
||
2011 | |||
2012 | out: |
||
2013 | g_free (user); |
||
2014 | if (h != INVALID_HANDLE_VALUE) |
||
2015 | CloseHandle (h); |
||
2016 | return ret; |
||
2017 | } |
||
2018 | #endif |
||
2019 | |||
2020 | /* ---------------------------------------------------------------------------------------------------- */ |
||
2021 | |||
2022 | gchar * |
||
2023 | _g_dbus_get_machine_id (GError **error) |
||
2024 | { |
||
2025 | #ifdef G_OS_WIN32 |
||
2026 | HW_PROFILE_INFOA info; |
||
2027 | char *src, *dest, *res; |
||
2028 | int i; |
||
2029 | |||
2030 | if (!GetCurrentHwProfileA (&info)) |
||
2031 | { |
||
2032 | char *message = g_win32_error_message (GetLastError ()); |
||
2033 | g_set_error (error, |
||
2034 | G_IO_ERROR, |
||
2035 | G_IO_ERROR_FAILED, |
||
2036 | _("Unable to get Hardware profile: %s"), message); |
||
2037 | g_free (message); |
||
2038 | return NULL; |
||
2039 | } |
||
2040 | |||
2041 | /* Form: {12340001-4980-1920-6788-123456789012} */ |
||
2042 | src = &info.szHwProfileGuid[0]; |
||
2043 | |||
2044 | res = g_malloc (32+1); |
||
2045 | dest = res; |
||
2046 | |||
2047 | src++; /* Skip { */ |
||
2048 | for (i = 0; i < 8; i++) |
||
2049 | *dest++ = *src++; |
||
2050 | src++; /* Skip - */ |
||
2051 | for (i = 0; i < 4; i++) |
||
2052 | *dest++ = *src++; |
||
2053 | src++; /* Skip - */ |
||
2054 | for (i = 0; i < 4; i++) |
||
2055 | *dest++ = *src++; |
||
2056 | src++; /* Skip - */ |
||
2057 | for (i = 0; i < 4; i++) |
||
2058 | *dest++ = *src++; |
||
2059 | src++; /* Skip - */ |
||
2060 | for (i = 0; i < 12; i++) |
||
2061 | *dest++ = *src++; |
||
2062 | *dest = 0; |
||
2063 | |||
2064 | return res; |
||
2065 | #else |
||
2066 | gchar *ret; |
||
2067 | GError *first_error; |
||
2068 | /* TODO: use PACKAGE_LOCALSTATEDIR ? */ |
||
2069 | ret = NULL; |
||
2070 | first_error = NULL; |
||
2071 | if (!g_file_get_contents ("/var/lib/dbus/machine-id", |
||
2072 | &ret, |
||
2073 | NULL, |
||
2074 | &first_error) && |
||
2075 | !g_file_get_contents ("/etc/machine-id", |
||
2076 | &ret, |
||
2077 | NULL, |
||
2078 | NULL)) |
||
2079 | { |
||
2080 | g_propagate_prefixed_error (error, first_error, |
||
2081 | _("Unable to load /var/lib/dbus/machine-id or /etc/machine-id: ")); |
||
2082 | } |
||
2083 | else |
||
2084 | { |
||
2085 | /* ignore the error from the first try, if any */ |
||
2086 | g_clear_error (&first_error); |
||
2087 | /* TODO: validate value */ |
||
2088 | g_strstrip (ret); |
||
2089 | } |
||
2090 | return ret; |
||
2091 | #endif |
||
2092 | } |
||
2093 | |||
2094 | /* ---------------------------------------------------------------------------------------------------- */ |
||
2095 | |||
2096 | gchar * |
||
2097 | _g_dbus_enum_to_string (GType enum_type, gint value) |
||
2098 | { |
||
2099 | gchar *ret; |
||
2100 | GEnumClass *klass; |
||
2101 | GEnumValue *enum_value; |
||
2102 | |||
2103 | klass = g_type_class_ref (enum_type); |
||
2104 | enum_value = g_enum_get_value (klass, value); |
||
2105 | if (enum_value != NULL) |
||
2106 | ret = g_strdup (enum_value->value_nick); |
||
2107 | else |
||
2108 | ret = g_strdup_printf ("unknown (value %d)", value); |
||
2109 | g_type_class_unref (klass); |
||
2110 | return ret; |
||
2111 | } |
||
2112 | |||
2113 | /* ---------------------------------------------------------------------------------------------------- */ |
||
2114 | |||
2115 | static void |
||
2116 | write_message_print_transport_debug (gssize bytes_written, |
||
2117 | MessageToWriteData *data) |
||
2118 | { |
||
2119 | if (G_LIKELY (!_g_dbus_debug_transport ())) |
||
2120 | goto out; |
||
2121 | |||
2122 | _g_dbus_debug_print_lock (); |
||
2123 | g_print ("========================================================================\n" |
||
2124 | "GDBus-debug:Transport:\n" |
||
2125 | " >>>> WROTE %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" |
||
2126 | " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n", |
||
2127 | bytes_written, |
||
2128 | g_dbus_message_get_serial (data->message), |
||
2129 | data->blob_size, |
||
2130 | data->total_written, |
||
2131 | g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); |
||
2132 | _g_dbus_debug_print_unlock (); |
||
2133 | out: |
||
2134 | ; |
||
2135 | } |
||
2136 | |||
2137 | /* ---------------------------------------------------------------------------------------------------- */ |
||
2138 | |||
2139 | static void |
||
2140 | read_message_print_transport_debug (gssize bytes_read, |
||
2141 | GDBusWorker *worker) |
||
2142 | { |
||
2143 | gsize size; |
||
2144 | gint32 serial; |
||
2145 | gint32 message_length; |
||
2146 | |||
2147 | if (G_LIKELY (!_g_dbus_debug_transport ())) |
||
2148 | goto out; |
||
2149 | |||
2150 | size = bytes_read + worker->read_buffer_cur_size; |
||
2151 | serial = 0; |
||
2152 | message_length = 0; |
||
2153 | if (size >= 16) |
||
2154 | message_length = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, size, NULL); |
||
2155 | if (size >= 1) |
||
2156 | { |
||
2157 | switch (worker->read_buffer[0]) |
||
2158 | { |
||
2159 | case 'l': |
||
2160 | if (size >= 12) |
||
2161 | serial = GUINT32_FROM_LE (((guint32 *) worker->read_buffer)[2]); |
||
2162 | break; |
||
2163 | case 'B': |
||
2164 | if (size >= 12) |
||
2165 | serial = GUINT32_FROM_BE (((guint32 *) worker->read_buffer)[2]); |
||
2166 | break; |
||
2167 | default: |
||
2168 | /* an error will be set elsewhere if this happens */ |
||
2169 | goto out; |
||
2170 | } |
||
2171 | } |
||
2172 | |||
2173 | _g_dbus_debug_print_lock (); |
||
2174 | g_print ("========================================================================\n" |
||
2175 | "GDBus-debug:Transport:\n" |
||
2176 | " <<<< READ %" G_GSSIZE_FORMAT " bytes of message with serial %d and\n" |
||
2177 | " size %d to offset %" G_GSIZE_FORMAT " from a %s\n", |
||
2178 | bytes_read, |
||
2179 | serial, |
||
2180 | message_length, |
||
2181 | worker->read_buffer_cur_size, |
||
2182 | g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream)))); |
||
2183 | _g_dbus_debug_print_unlock (); |
||
2184 | out: |
||
2185 | ; |
||
2186 | } |
||
2187 | |||
2188 | /* ---------------------------------------------------------------------------------------------------- */ |
||
2189 | |||
2190 | gboolean |
||
2191 | _g_signal_accumulator_false_handled (GSignalInvocationHint *ihint, |
||
2192 | GValue *return_accu, |
||
2193 | const GValue *handler_return, |
||
2194 | gpointer dummy) |
||
2195 | { |
||
2196 | gboolean continue_emission; |
||
2197 | gboolean signal_return; |
||
2198 | |||
2199 | signal_return = g_value_get_boolean (handler_return); |
||
2200 | g_value_set_boolean (return_accu, signal_return); |
||
2201 | continue_emission = signal_return; |
||
2202 | |||
2203 | return continue_emission; |
||
2204 | } |