nexmon – Blame information for rev 1
?pathlinks?
Rev | Author | Line No. | Line |
---|---|---|---|
1 | office | 1 | /* GLIB - Library of useful routines for C programming |
2 | * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald |
||
3 | * |
||
4 | * GThreadPool: thread pool implementation. |
||
5 | * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe |
||
6 | * |
||
7 | * This library is free software; you can redistribute it and/or |
||
8 | * modify it under the terms of the GNU Lesser General Public |
||
9 | * License as published by the Free Software Foundation; either |
||
10 | * version 2 of the License, or (at your option) any later version. |
||
11 | * |
||
12 | * This library is distributed in the hope that it will be useful, |
||
13 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
14 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
||
15 | * Lesser General Public License for more details. |
||
16 | * |
||
17 | * You should have received a copy of the GNU Lesser General Public |
||
18 | * License along with this library; if not, see <http://www.gnu.org/licenses/>. |
||
19 | */ |
||
20 | |||
21 | /* |
||
22 | * MT safe |
||
23 | */ |
||
24 | |||
25 | #include "config.h" |
||
26 | |||
27 | #include "gthreadpool.h" |
||
28 | |||
29 | #include "gasyncqueue.h" |
||
30 | #include "gasyncqueueprivate.h" |
||
31 | #include "gmain.h" |
||
32 | #include "gtestutils.h" |
||
33 | #include "gtimer.h" |
||
34 | |||
35 | /** |
||
36 | * SECTION:thread_pools |
||
37 | * @title: Thread Pools |
||
38 | * @short_description: pools of threads to execute work concurrently |
||
39 | * @see_also: #GThread |
||
40 | * |
||
41 | * Sometimes you wish to asynchronously fork out the execution of work |
||
42 | * and continue working in your own thread. If that will happen often, |
||
43 | * the overhead of starting and destroying a thread each time might be |
||
44 | * too high. In such cases reusing already started threads seems like a |
||
45 | * good idea. And it indeed is, but implementing this can be tedious |
||
46 | * and error-prone. |
||
47 | * |
||
48 | * Therefore GLib provides thread pools for your convenience. An added |
||
49 | * advantage is, that the threads can be shared between the different |
||
50 | * subsystems of your program, when they are using GLib. |
||
51 | * |
||
52 | * To create a new thread pool, you use g_thread_pool_new(). |
||
53 | * It is destroyed by g_thread_pool_free(). |
||
54 | * |
||
55 | * If you want to execute a certain task within a thread pool, |
||
56 | * you call g_thread_pool_push(). |
||
57 | * |
||
58 | * To get the current number of running threads you call |
||
59 | * g_thread_pool_get_num_threads(). To get the number of still |
||
60 | * unprocessed tasks you call g_thread_pool_unprocessed(). To control |
||
61 | * the maximal number of threads for a thread pool, you use |
||
62 | * g_thread_pool_get_max_threads() and g_thread_pool_set_max_threads(). |
||
63 | * |
||
64 | * Finally you can control the number of unused threads, that are kept |
||
65 | * alive by GLib for future use. The current number can be fetched with |
||
66 | * g_thread_pool_get_num_unused_threads(). The maximal number can be |
||
67 | * controlled by g_thread_pool_get_max_unused_threads() and |
||
68 | * g_thread_pool_set_max_unused_threads(). All currently unused threads |
||
69 | * can be stopped by calling g_thread_pool_stop_unused_threads(). |
||
70 | */ |
||
71 | |||
72 | #define DEBUG_MSG(x) |
||
73 | /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n"); */ |
||
74 | |||
75 | typedef struct _GRealThreadPool GRealThreadPool; |
||
76 | |||
77 | /** |
||
78 | * GThreadPool: |
||
79 | * @func: the function to execute in the threads of this pool |
||
80 | * @user_data: the user data for the threads of this pool |
||
81 | * @exclusive: are all threads exclusive to this pool |
||
82 | * |
||
83 | * The #GThreadPool struct represents a thread pool. It has three |
||
84 | * public read-only members, but the underlying struct is bigger, |
||
85 | * so you must not copy this struct. |
||
86 | */ |
||
87 | struct _GRealThreadPool |
||
88 | { |
||
89 | GThreadPool pool; |
||
90 | GAsyncQueue *queue; |
||
91 | GCond cond; |
||
92 | gint max_threads; |
||
93 | gint num_threads; |
||
94 | gboolean running; |
||
95 | gboolean immediate; |
||
96 | gboolean waiting; |
||
97 | GCompareDataFunc sort_func; |
||
98 | gpointer sort_user_data; |
||
99 | }; |
||
100 | |||
101 | /* The following is just an address to mark the wakeup order for a |
||
102 | * thread, it could be any address (as long, as it isn't a valid |
||
103 | * GThreadPool address) |
||
104 | */ |
||
105 | static const gpointer wakeup_thread_marker = (gpointer) &g_thread_pool_new; |
||
106 | static gint wakeup_thread_serial = 0; |
||
107 | |||
108 | /* Here all unused threads are waiting */ |
||
109 | static GAsyncQueue *unused_thread_queue = NULL; |
||
110 | static gint unused_threads = 0; |
||
111 | static gint max_unused_threads = 2; |
||
112 | static gint kill_unused_threads = 0; |
||
113 | static guint max_idle_time = 15 * 1000; |
||
114 | |||
115 | static void g_thread_pool_queue_push_unlocked (GRealThreadPool *pool, |
||
116 | gpointer data); |
||
117 | static void g_thread_pool_free_internal (GRealThreadPool *pool); |
||
118 | static gpointer g_thread_pool_thread_proxy (gpointer data); |
||
119 | static gboolean g_thread_pool_start_thread (GRealThreadPool *pool, |
||
120 | GError **error); |
||
121 | static void g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool); |
||
122 | static GRealThreadPool* g_thread_pool_wait_for_new_pool (void); |
||
123 | static gpointer g_thread_pool_wait_for_new_task (GRealThreadPool *pool); |
||
124 | |||
125 | static void |
||
126 | g_thread_pool_queue_push_unlocked (GRealThreadPool *pool, |
||
127 | gpointer data) |
||
128 | { |
||
129 | if (pool->sort_func) |
||
130 | g_async_queue_push_sorted_unlocked (pool->queue, |
||
131 | data, |
||
132 | pool->sort_func, |
||
133 | pool->sort_user_data); |
||
134 | else |
||
135 | g_async_queue_push_unlocked (pool->queue, data); |
||
136 | } |
||
137 | |||
138 | static GRealThreadPool* |
||
139 | g_thread_pool_wait_for_new_pool (void) |
||
140 | { |
||
141 | GRealThreadPool *pool; |
||
142 | gint local_wakeup_thread_serial; |
||
143 | guint local_max_unused_threads; |
||
144 | gint local_max_idle_time; |
||
145 | gint last_wakeup_thread_serial; |
||
146 | gboolean have_relayed_thread_marker = FALSE; |
||
147 | |||
148 | local_max_unused_threads = g_atomic_int_get (&max_unused_threads); |
||
149 | local_max_idle_time = g_atomic_int_get (&max_idle_time); |
||
150 | last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial); |
||
151 | |||
152 | g_atomic_int_inc (&unused_threads); |
||
153 | |||
154 | do |
||
155 | { |
||
156 | if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads) |
||
157 | { |
||
158 | /* If this is a superfluous thread, stop it. */ |
||
159 | pool = NULL; |
||
160 | } |
||
161 | else if (local_max_idle_time > 0) |
||
162 | { |
||
163 | /* If a maximal idle time is given, wait for the given time. */ |
||
164 | DEBUG_MSG (("thread %p waiting in global pool for %f seconds.", |
||
165 | g_thread_self (), local_max_idle_time / 1000.0)); |
||
166 | |||
167 | pool = g_async_queue_timeout_pop (unused_thread_queue, |
||
168 | local_max_idle_time * 1000); |
||
169 | } |
||
170 | else |
||
171 | { |
||
172 | /* If no maximal idle time is given, wait indefinitely. */ |
||
173 | DEBUG_MSG (("thread %p waiting in global pool.", g_thread_self ())); |
||
174 | pool = g_async_queue_pop (unused_thread_queue); |
||
175 | } |
||
176 | |||
177 | if (pool == wakeup_thread_marker) |
||
178 | { |
||
179 | local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial); |
||
180 | if (last_wakeup_thread_serial == local_wakeup_thread_serial) |
||
181 | { |
||
182 | if (!have_relayed_thread_marker) |
||
183 | { |
||
184 | /* If this wakeup marker has been received for |
||
185 | * the second time, relay it. |
||
186 | */ |
||
187 | DEBUG_MSG (("thread %p relaying wakeup message to " |
||
188 | "waiting thread with lower serial.", |
||
189 | g_thread_self ())); |
||
190 | |||
191 | g_async_queue_push (unused_thread_queue, wakeup_thread_marker); |
||
192 | have_relayed_thread_marker = TRUE; |
||
193 | |||
194 | /* If a wakeup marker has been relayed, this thread |
||
195 | * will get out of the way for 100 microseconds to |
||
196 | * avoid receiving this marker again. |
||
197 | */ |
||
198 | g_usleep (100); |
||
199 | } |
||
200 | } |
||
201 | else |
||
202 | { |
||
203 | if (g_atomic_int_add (&kill_unused_threads, -1) > 0) |
||
204 | { |
||
205 | pool = NULL; |
||
206 | break; |
||
207 | } |
||
208 | |||
209 | DEBUG_MSG (("thread %p updating to new limits.", |
||
210 | g_thread_self ())); |
||
211 | |||
212 | local_max_unused_threads = g_atomic_int_get (&max_unused_threads); |
||
213 | local_max_idle_time = g_atomic_int_get (&max_idle_time); |
||
214 | last_wakeup_thread_serial = local_wakeup_thread_serial; |
||
215 | |||
216 | have_relayed_thread_marker = FALSE; |
||
217 | } |
||
218 | } |
||
219 | } |
||
220 | while (pool == wakeup_thread_marker); |
||
221 | |||
222 | g_atomic_int_add (&unused_threads, -1); |
||
223 | |||
224 | return pool; |
||
225 | } |
||
226 | |||
227 | static gpointer |
||
228 | g_thread_pool_wait_for_new_task (GRealThreadPool *pool) |
||
229 | { |
||
230 | gpointer task = NULL; |
||
231 | |||
232 | if (pool->running || (!pool->immediate && |
||
233 | g_async_queue_length_unlocked (pool->queue) > 0)) |
||
234 | { |
||
235 | /* This thread pool is still active. */ |
||
236 | if (pool->num_threads > pool->max_threads && pool->max_threads != -1) |
||
237 | { |
||
238 | /* This is a superfluous thread, so it goes to the global pool. */ |
||
239 | DEBUG_MSG (("superfluous thread %p in pool %p.", |
||
240 | g_thread_self (), pool)); |
||
241 | } |
||
242 | else if (pool->pool.exclusive) |
||
243 | { |
||
244 | /* Exclusive threads stay attached to the pool. */ |
||
245 | task = g_async_queue_pop_unlocked (pool->queue); |
||
246 | |||
247 | DEBUG_MSG (("thread %p in exclusive pool %p waits for task " |
||
248 | "(%d running, %d unprocessed).", |
||
249 | g_thread_self (), pool, pool->num_threads, |
||
250 | g_async_queue_length_unlocked (pool->queue))); |
||
251 | } |
||
252 | else |
||
253 | { |
||
254 | /* A thread will wait for new tasks for at most 1/2 |
||
255 | * second before going to the global pool. |
||
256 | */ |
||
257 | DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task " |
||
258 | "(%d running, %d unprocessed).", |
||
259 | g_thread_self (), pool, pool->num_threads, |
||
260 | g_async_queue_length_unlocked (pool->queue))); |
||
261 | |||
262 | task = g_async_queue_timeout_pop_unlocked (pool->queue, |
||
263 | G_USEC_PER_SEC / 2); |
||
264 | } |
||
265 | } |
||
266 | else |
||
267 | { |
||
268 | /* This thread pool is inactive, it will no longer process tasks. */ |
||
269 | DEBUG_MSG (("pool %p not active, thread %p will go to global pool " |
||
270 | "(running: %s, immediate: %s, len: %d).", |
||
271 | pool, g_thread_self (), |
||
272 | pool->running ? "true" : "false", |
||
273 | pool->immediate ? "true" : "false", |
||
274 | g_async_queue_length_unlocked (pool->queue))); |
||
275 | } |
||
276 | |||
277 | return task; |
||
278 | } |
||
279 | |||
280 | |||
281 | static gpointer |
||
282 | g_thread_pool_thread_proxy (gpointer data) |
||
283 | { |
||
284 | GRealThreadPool *pool; |
||
285 | |||
286 | pool = data; |
||
287 | |||
288 | DEBUG_MSG (("thread %p started for pool %p.", g_thread_self (), pool)); |
||
289 | |||
290 | g_async_queue_lock (pool->queue); |
||
291 | |||
292 | while (TRUE) |
||
293 | { |
||
294 | gpointer task; |
||
295 | |||
296 | task = g_thread_pool_wait_for_new_task (pool); |
||
297 | if (task) |
||
298 | { |
||
299 | if (pool->running || !pool->immediate) |
||
300 | { |
||
301 | /* A task was received and the thread pool is active, |
||
302 | * so execute the function. |
||
303 | */ |
||
304 | g_async_queue_unlock (pool->queue); |
||
305 | DEBUG_MSG (("thread %p in pool %p calling func.", |
||
306 | g_thread_self (), pool)); |
||
307 | pool->pool.func (task, pool->pool.user_data); |
||
308 | g_async_queue_lock (pool->queue); |
||
309 | } |
||
310 | } |
||
311 | else |
||
312 | { |
||
313 | /* No task was received, so this thread goes to the global pool. */ |
||
314 | gboolean free_pool = FALSE; |
||
315 | |||
316 | DEBUG_MSG (("thread %p leaving pool %p for global pool.", |
||
317 | g_thread_self (), pool)); |
||
318 | pool->num_threads--; |
||
319 | |||
320 | if (!pool->running) |
||
321 | { |
||
322 | if (!pool->waiting) |
||
323 | { |
||
324 | if (pool->num_threads == 0) |
||
325 | { |
||
326 | /* If the pool is not running and no other |
||
327 | * thread is waiting for this thread pool to |
||
328 | * finish and this is the last thread of this |
||
329 | * pool, free the pool. |
||
330 | */ |
||
331 | free_pool = TRUE; |
||
332 | } |
||
333 | else |
||
334 | { |
||
335 | /* If the pool is not running and no other |
||
336 | * thread is waiting for this thread pool to |
||
337 | * finish and this is not the last thread of |
||
338 | * this pool and there are no tasks left in the |
||
339 | * queue, wakeup the remaining threads. |
||
340 | */ |
||
341 | if (g_async_queue_length_unlocked (pool->queue) == |
||
342 | - pool->num_threads) |
||
343 | g_thread_pool_wakeup_and_stop_all (pool); |
||
344 | } |
||
345 | } |
||
346 | else if (pool->immediate || |
||
347 | g_async_queue_length_unlocked (pool->queue) <= 0) |
||
348 | { |
||
349 | /* If the pool is not running and another thread is |
||
350 | * waiting for this thread pool to finish and there |
||
351 | * are either no tasks left or the pool shall stop |
||
352 | * immediately, inform the waiting thread of a change |
||
353 | * of the thread pool state. |
||
354 | */ |
||
355 | g_cond_broadcast (&pool->cond); |
||
356 | } |
||
357 | } |
||
358 | |||
359 | g_async_queue_unlock (pool->queue); |
||
360 | |||
361 | if (free_pool) |
||
362 | g_thread_pool_free_internal (pool); |
||
363 | |||
364 | if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) |
||
365 | break; |
||
366 | |||
367 | g_async_queue_lock (pool->queue); |
||
368 | |||
369 | DEBUG_MSG (("thread %p entering pool %p from global pool.", |
||
370 | g_thread_self (), pool)); |
||
371 | |||
372 | /* pool->num_threads++ is not done here, but in |
||
373 | * g_thread_pool_start_thread to make the new started |
||
374 | * thread known to the pool before itself can do it. |
||
375 | */ |
||
376 | } |
||
377 | } |
||
378 | |||
379 | return NULL; |
||
380 | } |
||
381 | |||
382 | static gboolean |
||
383 | g_thread_pool_start_thread (GRealThreadPool *pool, |
||
384 | GError **error) |
||
385 | { |
||
386 | gboolean success = FALSE; |
||
387 | |||
388 | if (pool->num_threads >= pool->max_threads && pool->max_threads != -1) |
||
389 | /* Enough threads are already running */ |
||
390 | return TRUE; |
||
391 | |||
392 | g_async_queue_lock (unused_thread_queue); |
||
393 | |||
394 | if (g_async_queue_length_unlocked (unused_thread_queue) < 0) |
||
395 | { |
||
396 | g_async_queue_push_unlocked (unused_thread_queue, pool); |
||
397 | success = TRUE; |
||
398 | } |
||
399 | |||
400 | g_async_queue_unlock (unused_thread_queue); |
||
401 | |||
402 | if (!success) |
||
403 | { |
||
404 | GThread *thread; |
||
405 | |||
406 | /* No thread was found, we have to start a new one */ |
||
407 | thread = g_thread_try_new ("pool", g_thread_pool_thread_proxy, pool, error); |
||
408 | |||
409 | if (thread == NULL) |
||
410 | return FALSE; |
||
411 | |||
412 | g_thread_unref (thread); |
||
413 | } |
||
414 | |||
415 | /* See comment in g_thread_pool_thread_proxy as to why this is done |
||
416 | * here and not there |
||
417 | */ |
||
418 | pool->num_threads++; |
||
419 | |||
420 | return TRUE; |
||
421 | } |
||
422 | |||
423 | /** |
||
424 | * g_thread_pool_new: |
||
425 | * @func: a function to execute in the threads of the new thread pool |
||
426 | * @user_data: user data that is handed over to @func every time it |
||
427 | * is called |
||
428 | * @max_threads: the maximal number of threads to execute concurrently |
||
429 | * in the new thread pool, -1 means no limit |
||
430 | * @exclusive: should this thread pool be exclusive? |
||
431 | * @error: return location for error, or %NULL |
||
432 | * |
||
433 | * This function creates a new thread pool. |
||
434 | * |
||
435 | * Whenever you call g_thread_pool_push(), either a new thread is |
||
436 | * created or an unused one is reused. At most @max_threads threads |
||
437 | * are running concurrently for this thread pool. @max_threads = -1 |
||
438 | * allows unlimited threads to be created for this thread pool. The |
||
439 | * newly created or reused thread now executes the function @func |
||
440 | * with the two arguments. The first one is the parameter to |
||
441 | * g_thread_pool_push() and the second one is @user_data. |
||
442 | * |
||
443 | * The parameter @exclusive determines whether the thread pool owns |
||
444 | * all threads exclusive or shares them with other thread pools. |
||
445 | * If @exclusive is %TRUE, @max_threads threads are started |
||
446 | * immediately and they will run exclusively for this thread pool |
||
447 | * until it is destroyed by g_thread_pool_free(). If @exclusive is |
||
448 | * %FALSE, threads are created when needed and shared between all |
||
449 | * non-exclusive thread pools. This implies that @max_threads may |
||
450 | * not be -1 for exclusive thread pools. Besides, exclusive thread |
||
451 | * pools are not affected by g_thread_pool_set_max_idle_time() |
||
452 | * since their threads are never considered idle and returned to the |
||
453 | * global pool. |
||
454 | * |
||
455 | * @error can be %NULL to ignore errors, or non-%NULL to report |
||
456 | * errors. An error can only occur when @exclusive is set to %TRUE |
||
457 | * and not all @max_threads threads could be created. |
||
458 | * See #GThreadError for possible errors that may occur. |
||
459 | * Note, even in case of error a valid #GThreadPool is returned. |
||
460 | * |
||
461 | * Returns: the new #GThreadPool |
||
462 | */ |
||
463 | GThreadPool * |
||
464 | g_thread_pool_new (GFunc func, |
||
465 | gpointer user_data, |
||
466 | gint max_threads, |
||
467 | gboolean exclusive, |
||
468 | GError **error) |
||
469 | { |
||
470 | GRealThreadPool *retval; |
||
471 | G_LOCK_DEFINE_STATIC (init); |
||
472 | |||
473 | g_return_val_if_fail (func, NULL); |
||
474 | g_return_val_if_fail (!exclusive || max_threads != -1, NULL); |
||
475 | g_return_val_if_fail (max_threads >= -1, NULL); |
||
476 | |||
477 | retval = g_new (GRealThreadPool, 1); |
||
478 | |||
479 | retval->pool.func = func; |
||
480 | retval->pool.user_data = user_data; |
||
481 | retval->pool.exclusive = exclusive; |
||
482 | retval->queue = g_async_queue_new (); |
||
483 | g_cond_init (&retval->cond); |
||
484 | retval->max_threads = max_threads; |
||
485 | retval->num_threads = 0; |
||
486 | retval->running = TRUE; |
||
487 | retval->immediate = FALSE; |
||
488 | retval->waiting = FALSE; |
||
489 | retval->sort_func = NULL; |
||
490 | retval->sort_user_data = NULL; |
||
491 | |||
492 | G_LOCK (init); |
||
493 | if (!unused_thread_queue) |
||
494 | unused_thread_queue = g_async_queue_new (); |
||
495 | G_UNLOCK (init); |
||
496 | |||
497 | if (retval->pool.exclusive) |
||
498 | { |
||
499 | g_async_queue_lock (retval->queue); |
||
500 | |||
501 | while (retval->num_threads < retval->max_threads) |
||
502 | { |
||
503 | GError *local_error = NULL; |
||
504 | |||
505 | if (!g_thread_pool_start_thread (retval, &local_error)) |
||
506 | { |
||
507 | g_propagate_error (error, local_error); |
||
508 | break; |
||
509 | } |
||
510 | } |
||
511 | |||
512 | g_async_queue_unlock (retval->queue); |
||
513 | } |
||
514 | |||
515 | return (GThreadPool*) retval; |
||
516 | } |
||
517 | |||
518 | /** |
||
519 | * g_thread_pool_push: |
||
520 | * @pool: a #GThreadPool |
||
521 | * @data: a new task for @pool |
||
522 | * @error: return location for error, or %NULL |
||
523 | * |
||
524 | * Inserts @data into the list of tasks to be executed by @pool. |
||
525 | * |
||
526 | * When the number of currently running threads is lower than the |
||
527 | * maximal allowed number of threads, a new thread is started (or |
||
528 | * reused) with the properties given to g_thread_pool_new(). |
||
529 | * Otherwise, @data stays in the queue until a thread in this pool |
||
530 | * finishes its previous task and processes @data. |
||
531 | * |
||
532 | * @error can be %NULL to ignore errors, or non-%NULL to report |
||
533 | * errors. An error can only occur when a new thread couldn't be |
||
534 | * created. In that case @data is simply appended to the queue of |
||
535 | * work to do. |
||
536 | * |
||
537 | * Before version 2.32, this function did not return a success status. |
||
538 | * |
||
539 | * Returns: %TRUE on success, %FALSE if an error occurred |
||
540 | */ |
||
541 | gboolean |
||
542 | g_thread_pool_push (GThreadPool *pool, |
||
543 | gpointer data, |
||
544 | GError **error) |
||
545 | { |
||
546 | GRealThreadPool *real; |
||
547 | gboolean result; |
||
548 | |||
549 | real = (GRealThreadPool*) pool; |
||
550 | |||
551 | g_return_val_if_fail (real, FALSE); |
||
552 | g_return_val_if_fail (real->running, FALSE); |
||
553 | |||
554 | result = TRUE; |
||
555 | |||
556 | g_async_queue_lock (real->queue); |
||
557 | |||
558 | if (g_async_queue_length_unlocked (real->queue) >= 0) |
||
559 | { |
||
560 | /* No thread is waiting in the queue */ |
||
561 | GError *local_error = NULL; |
||
562 | |||
563 | if (!g_thread_pool_start_thread (real, &local_error)) |
||
564 | { |
||
565 | g_propagate_error (error, local_error); |
||
566 | result = FALSE; |
||
567 | } |
||
568 | } |
||
569 | |||
570 | g_thread_pool_queue_push_unlocked (real, data); |
||
571 | g_async_queue_unlock (real->queue); |
||
572 | |||
573 | return result; |
||
574 | } |
||
575 | |||
576 | /** |
||
577 | * g_thread_pool_set_max_threads: |
||
578 | * @pool: a #GThreadPool |
||
579 | * @max_threads: a new maximal number of threads for @pool, |
||
580 | * or -1 for unlimited |
||
581 | * @error: return location for error, or %NULL |
||
582 | * |
||
583 | * Sets the maximal allowed number of threads for @pool. |
||
584 | * A value of -1 means that the maximal number of threads |
||
585 | * is unlimited. If @pool is an exclusive thread pool, setting |
||
586 | * the maximal number of threads to -1 is not allowed. |
||
587 | * |
||
588 | * Setting @max_threads to 0 means stopping all work for @pool. |
||
589 | * It is effectively frozen until @max_threads is set to a non-zero |
||
590 | * value again. |
||
591 | * |
||
592 | * A thread is never terminated while calling @func, as supplied by |
||
593 | * g_thread_pool_new(). Instead the maximal number of threads only |
||
594 | * has effect for the allocation of new threads in g_thread_pool_push(). |
||
595 | * A new thread is allocated, whenever the number of currently |
||
596 | * running threads in @pool is smaller than the maximal number. |
||
597 | * |
||
598 | * @error can be %NULL to ignore errors, or non-%NULL to report |
||
599 | * errors. An error can only occur when a new thread couldn't be |
||
600 | * created. |
||
601 | * |
||
602 | * Before version 2.32, this function did not return a success status. |
||
603 | * |
||
604 | * Returns: %TRUE on success, %FALSE if an error occurred |
||
605 | */ |
||
606 | gboolean |
||
607 | g_thread_pool_set_max_threads (GThreadPool *pool, |
||
608 | gint max_threads, |
||
609 | GError **error) |
||
610 | { |
||
611 | GRealThreadPool *real; |
||
612 | gint to_start; |
||
613 | gboolean result; |
||
614 | |||
615 | real = (GRealThreadPool*) pool; |
||
616 | |||
617 | g_return_val_if_fail (real, FALSE); |
||
618 | g_return_val_if_fail (real->running, FALSE); |
||
619 | g_return_val_if_fail (!real->pool.exclusive || max_threads != -1, FALSE); |
||
620 | g_return_val_if_fail (max_threads >= -1, FALSE); |
||
621 | |||
622 | result = TRUE; |
||
623 | |||
624 | g_async_queue_lock (real->queue); |
||
625 | |||
626 | real->max_threads = max_threads; |
||
627 | |||
628 | if (pool->exclusive) |
||
629 | to_start = real->max_threads - real->num_threads; |
||
630 | else |
||
631 | to_start = g_async_queue_length_unlocked (real->queue); |
||
632 | |||
633 | for ( ; to_start > 0; to_start--) |
||
634 | { |
||
635 | GError *local_error = NULL; |
||
636 | |||
637 | if (!g_thread_pool_start_thread (real, &local_error)) |
||
638 | { |
||
639 | g_propagate_error (error, local_error); |
||
640 | result = FALSE; |
||
641 | break; |
||
642 | } |
||
643 | } |
||
644 | |||
645 | g_async_queue_unlock (real->queue); |
||
646 | |||
647 | return result; |
||
648 | } |
||
649 | |||
650 | /** |
||
651 | * g_thread_pool_get_max_threads: |
||
652 | * @pool: a #GThreadPool |
||
653 | * |
||
654 | * Returns the maximal number of threads for @pool. |
||
655 | * |
||
656 | * Returns: the maximal number of threads |
||
657 | */ |
||
658 | gint |
||
659 | g_thread_pool_get_max_threads (GThreadPool *pool) |
||
660 | { |
||
661 | GRealThreadPool *real; |
||
662 | gint retval; |
||
663 | |||
664 | real = (GRealThreadPool*) pool; |
||
665 | |||
666 | g_return_val_if_fail (real, 0); |
||
667 | g_return_val_if_fail (real->running, 0); |
||
668 | |||
669 | g_async_queue_lock (real->queue); |
||
670 | retval = real->max_threads; |
||
671 | g_async_queue_unlock (real->queue); |
||
672 | |||
673 | return retval; |
||
674 | } |
||
675 | |||
676 | /** |
||
677 | * g_thread_pool_get_num_threads: |
||
678 | * @pool: a #GThreadPool |
||
679 | * |
||
680 | * Returns the number of threads currently running in @pool. |
||
681 | * |
||
682 | * Returns: the number of threads currently running |
||
683 | */ |
||
684 | guint |
||
685 | g_thread_pool_get_num_threads (GThreadPool *pool) |
||
686 | { |
||
687 | GRealThreadPool *real; |
||
688 | guint retval; |
||
689 | |||
690 | real = (GRealThreadPool*) pool; |
||
691 | |||
692 | g_return_val_if_fail (real, 0); |
||
693 | g_return_val_if_fail (real->running, 0); |
||
694 | |||
695 | g_async_queue_lock (real->queue); |
||
696 | retval = real->num_threads; |
||
697 | g_async_queue_unlock (real->queue); |
||
698 | |||
699 | return retval; |
||
700 | } |
||
701 | |||
702 | /** |
||
703 | * g_thread_pool_unprocessed: |
||
704 | * @pool: a #GThreadPool |
||
705 | * |
||
706 | * Returns the number of tasks still unprocessed in @pool. |
||
707 | * |
||
708 | * Returns: the number of unprocessed tasks |
||
709 | */ |
||
710 | guint |
||
711 | g_thread_pool_unprocessed (GThreadPool *pool) |
||
712 | { |
||
713 | GRealThreadPool *real; |
||
714 | gint unprocessed; |
||
715 | |||
716 | real = (GRealThreadPool*) pool; |
||
717 | |||
718 | g_return_val_if_fail (real, 0); |
||
719 | g_return_val_if_fail (real->running, 0); |
||
720 | |||
721 | unprocessed = g_async_queue_length (real->queue); |
||
722 | |||
723 | return MAX (unprocessed, 0); |
||
724 | } |
||
725 | |||
726 | /** |
||
727 | * g_thread_pool_free: |
||
728 | * @pool: a #GThreadPool |
||
729 | * @immediate: should @pool shut down immediately? |
||
730 | * @wait_: should the function wait for all tasks to be finished? |
||
731 | * |
||
732 | * Frees all resources allocated for @pool. |
||
733 | * |
||
734 | * If @immediate is %TRUE, no new task is processed for @pool. |
||
735 | * Otherwise @pool is not freed before the last task is processed. |
||
736 | * Note however, that no thread of this pool is interrupted while |
||
737 | * processing a task. Instead at least all still running threads |
||
738 | * can finish their tasks before the @pool is freed. |
||
739 | * |
||
740 | * If @wait_ is %TRUE, the functions does not return before all |
||
741 | * tasks to be processed (dependent on @immediate, whether all |
||
742 | * or only the currently running) are ready. |
||
743 | * Otherwise the function returns immediately. |
||
744 | * |
||
745 | * After calling this function @pool must not be used anymore. |
||
746 | */ |
||
747 | void |
||
748 | g_thread_pool_free (GThreadPool *pool, |
||
749 | gboolean immediate, |
||
750 | gboolean wait_) |
||
751 | { |
||
752 | GRealThreadPool *real; |
||
753 | |||
754 | real = (GRealThreadPool*) pool; |
||
755 | |||
756 | g_return_if_fail (real); |
||
757 | g_return_if_fail (real->running); |
||
758 | |||
759 | /* If there's no thread allowed here, there is not much sense in |
||
760 | * not stopping this pool immediately, when it's not empty |
||
761 | */ |
||
762 | g_return_if_fail (immediate || |
||
763 | real->max_threads != 0 || |
||
764 | g_async_queue_length (real->queue) == 0); |
||
765 | |||
766 | g_async_queue_lock (real->queue); |
||
767 | |||
768 | real->running = FALSE; |
||
769 | real->immediate = immediate; |
||
770 | real->waiting = wait_; |
||
771 | |||
772 | if (wait_) |
||
773 | { |
||
774 | while (g_async_queue_length_unlocked (real->queue) != -real->num_threads && |
||
775 | !(immediate && real->num_threads == 0)) |
||
776 | g_cond_wait (&real->cond, _g_async_queue_get_mutex (real->queue)); |
||
777 | } |
||
778 | |||
779 | if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads) |
||
780 | { |
||
781 | /* No thread is currently doing something (and nothing is left |
||
782 | * to process in the queue) |
||
783 | */ |
||
784 | if (real->num_threads == 0) |
||
785 | { |
||
786 | /* No threads left, we clean up */ |
||
787 | g_async_queue_unlock (real->queue); |
||
788 | g_thread_pool_free_internal (real); |
||
789 | return; |
||
790 | } |
||
791 | |||
792 | g_thread_pool_wakeup_and_stop_all (real); |
||
793 | } |
||
794 | |||
795 | /* The last thread should cleanup the pool */ |
||
796 | real->waiting = FALSE; |
||
797 | g_async_queue_unlock (real->queue); |
||
798 | } |
||
799 | |||
800 | static void |
||
801 | g_thread_pool_free_internal (GRealThreadPool* pool) |
||
802 | { |
||
803 | g_return_if_fail (pool); |
||
804 | g_return_if_fail (pool->running == FALSE); |
||
805 | g_return_if_fail (pool->num_threads == 0); |
||
806 | |||
807 | g_async_queue_unref (pool->queue); |
||
808 | g_cond_clear (&pool->cond); |
||
809 | |||
810 | g_free (pool); |
||
811 | } |
||
812 | |||
813 | static void |
||
814 | g_thread_pool_wakeup_and_stop_all (GRealThreadPool *pool) |
||
815 | { |
||
816 | guint i; |
||
817 | |||
818 | g_return_if_fail (pool); |
||
819 | g_return_if_fail (pool->running == FALSE); |
||
820 | g_return_if_fail (pool->num_threads != 0); |
||
821 | |||
822 | pool->immediate = TRUE; |
||
823 | |||
824 | /* |
||
825 | * So here we're sending bogus data to the pool threads, which |
||
826 | * should cause them each to wake up, and check the above |
||
827 | * pool->immediate condition. However we don't want that |
||
828 | * data to be sorted (since it'll crash the sorter). |
||
829 | */ |
||
830 | for (i = 0; i < pool->num_threads; i++) |
||
831 | g_async_queue_push_unlocked (pool->queue, GUINT_TO_POINTER (1)); |
||
832 | } |
||
833 | |||
834 | /** |
||
835 | * g_thread_pool_set_max_unused_threads: |
||
836 | * @max_threads: maximal number of unused threads |
||
837 | * |
||
838 | * Sets the maximal number of unused threads to @max_threads. |
||
839 | * If @max_threads is -1, no limit is imposed on the number |
||
840 | * of unused threads. |
||
841 | * |
||
842 | * The default value is 2. |
||
843 | */ |
||
844 | void |
||
845 | g_thread_pool_set_max_unused_threads (gint max_threads) |
||
846 | { |
||
847 | g_return_if_fail (max_threads >= -1); |
||
848 | |||
849 | g_atomic_int_set (&max_unused_threads, max_threads); |
||
850 | |||
851 | if (max_threads != -1) |
||
852 | { |
||
853 | max_threads -= g_atomic_int_get (&unused_threads); |
||
854 | if (max_threads < 0) |
||
855 | { |
||
856 | g_atomic_int_set (&kill_unused_threads, -max_threads); |
||
857 | g_atomic_int_inc (&wakeup_thread_serial); |
||
858 | |||
859 | g_async_queue_lock (unused_thread_queue); |
||
860 | |||
861 | do |
||
862 | { |
||
863 | g_async_queue_push_unlocked (unused_thread_queue, |
||
864 | wakeup_thread_marker); |
||
865 | } |
||
866 | while (++max_threads); |
||
867 | |||
868 | g_async_queue_unlock (unused_thread_queue); |
||
869 | } |
||
870 | } |
||
871 | } |
||
872 | |||
873 | /** |
||
874 | * g_thread_pool_get_max_unused_threads: |
||
875 | * |
||
876 | * Returns the maximal allowed number of unused threads. |
||
877 | * |
||
878 | * Returns: the maximal number of unused threads |
||
879 | */ |
||
880 | gint |
||
881 | g_thread_pool_get_max_unused_threads (void) |
||
882 | { |
||
883 | return g_atomic_int_get (&max_unused_threads); |
||
884 | } |
||
885 | |||
886 | /** |
||
887 | * g_thread_pool_get_num_unused_threads: |
||
888 | * |
||
889 | * Returns the number of currently unused threads. |
||
890 | * |
||
891 | * Returns: the number of currently unused threads |
||
892 | */ |
||
893 | guint |
||
894 | g_thread_pool_get_num_unused_threads (void) |
||
895 | { |
||
896 | return g_atomic_int_get (&unused_threads); |
||
897 | } |
||
898 | |||
899 | /** |
||
900 | * g_thread_pool_stop_unused_threads: |
||
901 | * |
||
902 | * Stops all currently unused threads. This does not change the |
||
903 | * maximal number of unused threads. This function can be used to |
||
904 | * regularly stop all unused threads e.g. from g_timeout_add(). |
||
905 | */ |
||
906 | void |
||
907 | g_thread_pool_stop_unused_threads (void) |
||
908 | { |
||
909 | guint oldval; |
||
910 | |||
911 | oldval = g_thread_pool_get_max_unused_threads (); |
||
912 | |||
913 | g_thread_pool_set_max_unused_threads (0); |
||
914 | g_thread_pool_set_max_unused_threads (oldval); |
||
915 | } |
||
916 | |||
917 | /** |
||
918 | * g_thread_pool_set_sort_function: |
||
919 | * @pool: a #GThreadPool |
||
920 | * @func: the #GCompareDataFunc used to sort the list of tasks. |
||
921 | * This function is passed two tasks. It should return |
||
922 | * 0 if the order in which they are handled does not matter, |
||
923 | * a negative value if the first task should be processed before |
||
924 | * the second or a positive value if the second task should be |
||
925 | * processed first. |
||
926 | * @user_data: user data passed to @func |
||
927 | * |
||
928 | * Sets the function used to sort the list of tasks. This allows the |
||
929 | * tasks to be processed by a priority determined by @func, and not |
||
930 | * just in the order in which they were added to the pool. |
||
931 | * |
||
932 | * Note, if the maximum number of threads is more than 1, the order |
||
933 | * that threads are executed cannot be guaranteed 100%. Threads are |
||
934 | * scheduled by the operating system and are executed at random. It |
||
935 | * cannot be assumed that threads are executed in the order they are |
||
936 | * created. |
||
937 | * |
||
938 | * Since: 2.10 |
||
939 | */ |
||
940 | void |
||
941 | g_thread_pool_set_sort_function (GThreadPool *pool, |
||
942 | GCompareDataFunc func, |
||
943 | gpointer user_data) |
||
944 | { |
||
945 | GRealThreadPool *real; |
||
946 | |||
947 | real = (GRealThreadPool*) pool; |
||
948 | |||
949 | g_return_if_fail (real); |
||
950 | g_return_if_fail (real->running); |
||
951 | |||
952 | g_async_queue_lock (real->queue); |
||
953 | |||
954 | real->sort_func = func; |
||
955 | real->sort_user_data = user_data; |
||
956 | |||
957 | if (func) |
||
958 | g_async_queue_sort_unlocked (real->queue, |
||
959 | real->sort_func, |
||
960 | real->sort_user_data); |
||
961 | |||
962 | g_async_queue_unlock (real->queue); |
||
963 | } |
||
964 | |||
965 | /** |
||
966 | * g_thread_pool_move_to_front: |
||
967 | * @pool: a #GThreadPool |
||
968 | * @data: an unprocessed item in the pool |
||
969 | * |
||
970 | * Moves the item to the front of the queue of unprocessed |
||
971 | * items, so that it will be processed next. |
||
972 | * |
||
973 | * Returns: %TRUE if the item was found and moved |
||
974 | * |
||
975 | * Since: 2.46 |
||
976 | */ |
||
977 | gboolean |
||
978 | g_thread_pool_move_to_front (GThreadPool *pool, |
||
979 | gpointer data) |
||
980 | { |
||
981 | GRealThreadPool *real = (GRealThreadPool*) pool; |
||
982 | gboolean found; |
||
983 | |||
984 | g_async_queue_lock (real->queue); |
||
985 | |||
986 | found = g_async_queue_remove_unlocked (real->queue, data); |
||
987 | if (found) |
||
988 | g_async_queue_push_front_unlocked (real->queue, data); |
||
989 | |||
990 | g_async_queue_unlock (real->queue); |
||
991 | |||
992 | return found; |
||
993 | } |
||
994 | |||
995 | /** |
||
996 | * g_thread_pool_set_max_idle_time: |
||
997 | * @interval: the maximum @interval (in milliseconds) |
||
998 | * a thread can be idle |
||
999 | * |
||
1000 | * This function will set the maximum @interval that a thread |
||
1001 | * waiting in the pool for new tasks can be idle for before |
||
1002 | * being stopped. This function is similar to calling |
||
1003 | * g_thread_pool_stop_unused_threads() on a regular timeout, |
||
1004 | * except this is done on a per thread basis. |
||
1005 | * |
||
1006 | * By setting @interval to 0, idle threads will not be stopped. |
||
1007 | * |
||
1008 | * The default value is 15000 (15 seconds). |
||
1009 | * |
||
1010 | * Since: 2.10 |
||
1011 | */ |
||
1012 | void |
||
1013 | g_thread_pool_set_max_idle_time (guint interval) |
||
1014 | { |
||
1015 | guint i; |
||
1016 | |||
1017 | g_atomic_int_set (&max_idle_time, interval); |
||
1018 | |||
1019 | i = g_atomic_int_get (&unused_threads); |
||
1020 | if (i > 0) |
||
1021 | { |
||
1022 | g_atomic_int_inc (&wakeup_thread_serial); |
||
1023 | g_async_queue_lock (unused_thread_queue); |
||
1024 | |||
1025 | do |
||
1026 | { |
||
1027 | g_async_queue_push_unlocked (unused_thread_queue, |
||
1028 | wakeup_thread_marker); |
||
1029 | } |
||
1030 | while (--i); |
||
1031 | |||
1032 | g_async_queue_unlock (unused_thread_queue); |
||
1033 | } |
||
1034 | } |
||
1035 | |||
1036 | /** |
||
1037 | * g_thread_pool_get_max_idle_time: |
||
1038 | * |
||
1039 | * This function will return the maximum @interval that a |
||
1040 | * thread will wait in the thread pool for new tasks before |
||
1041 | * being stopped. |
||
1042 | * |
||
1043 | * If this function returns 0, threads waiting in the thread |
||
1044 | * pool for new work are not stopped. |
||
1045 | * |
||
1046 | * Returns: the maximum @interval (milliseconds) to wait |
||
1047 | * for new tasks in the thread pool before stopping the |
||
1048 | * thread |
||
1049 | * |
||
1050 | * Since: 2.10 |
||
1051 | */ |
||
1052 | guint |
||
1053 | g_thread_pool_get_max_idle_time (void) |
||
1054 | { |
||
1055 | return g_atomic_int_get (&max_idle_time); |
||
1056 | } |