BadVPN – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /**
2 * @file BThreadWork.c
3 * @author Ambroz Bizjak <ambrop7@gmail.com>
4 *
5 * @section LICENSE
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the author nor the
15 * names of its contributors may be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29  
30 #include <stdint.h>
31 #include <stddef.h>
32  
33 #ifdef BADVPN_THREADWORK_USE_PTHREAD
34 #include <unistd.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #endif
38  
39 #include <misc/offset.h>
40 #include <base/BLog.h>
41  
42 #include <generated/blog_channel_BThreadWork.h>
43  
44 #include <threadwork/BThreadWork.h>
45  
46 #ifdef BADVPN_THREADWORK_USE_PTHREAD
47  
48 static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
49 {
50 BThreadWorkDispatcher *o = t->d;
51  
52 ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
53  
54 while (1) {
55 // exit if requested
56 if (o->cancel) {
57 break;
58 }
59  
60 if (LinkedList1_IsEmpty(&o->pending_list)) {
61 // wait for event
62 ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
63 continue;
64 }
65  
66 // grab the work
67 BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->pending_list), BThreadWork, list_node);
68 ASSERT(w->state == BTHREADWORK_STATE_PENDING)
69 LinkedList1_Remove(&o->pending_list, &w->list_node);
70 t->running_work = w;
71 w->state = BTHREADWORK_STATE_RUNNING;
72  
73 // do the work
74 ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
75 w->work_func(w->work_func_user);
76 ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
77  
78 // release the work
79 t->running_work = NULL;
80 LinkedList1_Append(&o->finished_list, &w->list_node);
81 w->state = BTHREADWORK_STATE_FINISHED;
82 ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
83  
84 // write to pipe
85 uint8_t b = 0;
86 int res = write(o->pipe[1], &b, sizeof(b));
87 if (res < 0) {
88 int error = errno;
89 ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
90 }
91 }
92  
93 ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
94  
95 return NULL;
96 }
97  
98 static void dispatch_job (BThreadWorkDispatcher *o)
99 {
100 ASSERT(o->num_threads > 0)
101  
102 // lock
103 ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
104  
105 // check for finished job
106 if (LinkedList1_IsEmpty(&o->finished_list)) {
107 ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
108 return;
109 }
110  
111 // grab finished job
112 BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->finished_list), BThreadWork, list_node);
113 ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
114 LinkedList1_Remove(&o->finished_list, &w->list_node);
115  
116 // schedule more
117 if (!LinkedList1_IsEmpty(&o->finished_list)) {
118 BPending_Set(&o->more_job);
119 }
120  
121 // set state forgotten
122 w->state = BTHREADWORK_STATE_FORGOTTEN;
123  
124 // unlock
125 ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
126  
127 // call handler
128 w->handler_done(w->user);
129 return;
130 }
131  
132 static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
133 {
134 ASSERT(o->num_threads > 0)
135 DebugObject_Access(&o->d_obj);
136  
137 // read data from pipe
138 uint8_t b[64];
139 int res = read(o->pipe[0], b, sizeof(b));
140 if (res < 0) {
141 int error = errno;
142 ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
143 } else {
144 ASSERT(res > 0)
145 }
146  
147 dispatch_job(o);
148 return;
149 }
150  
151 static void more_job_handler (BThreadWorkDispatcher *o)
152 {
153 ASSERT(o->num_threads > 0)
154 DebugObject_Access(&o->d_obj);
155  
156 dispatch_job(o);
157 return;
158 }
159  
160 static void stop_threads (BThreadWorkDispatcher *o)
161 {
162 // set cancelling
163 ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
164 o->cancel = 1;
165 ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
166  
167 while (o->num_threads > 0) {
168 struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
169  
170 // wake up thread
171 ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
172  
173 // wait for thread to exit
174 ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
175  
176 // free condition variable
177 ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
178  
179 o->num_threads--;
180 }
181 }
182  
183 #endif
184  
185 static void work_job_handler (BThreadWork *o)
186 {
187 #ifdef BADVPN_THREADWORK_USE_PTHREAD
188 ASSERT(o->d->num_threads == 0)
189 #endif
190 DebugObject_Access(&o->d_obj);
191  
192 // do the work
193 o->work_func(o->work_func_user);
194  
195 // call handler
196 o->handler_done(o->user);
197 return;
198 }
199  
200 int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
201 {
202 // init arguments
203 o->reactor = reactor;
204  
205 if (num_threads_hint < 0) {
206 num_threads_hint = 2;
207 }
208 if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
209 num_threads_hint = BTHREADWORK_MAX_THREADS;
210 }
211  
212 #ifdef BADVPN_THREADWORK_USE_PTHREAD
213  
214 if (num_threads_hint > 0) {
215 // init pending list
216 LinkedList1_Init(&o->pending_list);
217  
218 // init finished list
219 LinkedList1_Init(&o->finished_list);
220  
221 // init mutex
222 if (pthread_mutex_init(&o->mutex, NULL) != 0) {
223 BLog(BLOG_ERROR, "pthread_mutex_init failed");
224 goto fail0;
225 }
226  
227 // init pipe
228 if (pipe(o->pipe) < 0) {
229 BLog(BLOG_ERROR, "pipe failed");
230 goto fail1;
231 }
232  
233 // set read end non-blocking
234 if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
235 BLog(BLOG_ERROR, "fcntl failed");
236 goto fail2;
237 }
238  
239 // set write end non-blocking
240 if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
241 BLog(BLOG_ERROR, "fcntl failed");
242 goto fail2;
243 }
244  
245 // init BFileDescriptor
246 BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
247 if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
248 BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
249 goto fail2;
250 }
251 BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
252  
253 // init more job
254 BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
255  
256 // set not cancelling
257 o->cancel = 0;
258  
259 // init threads
260 o->num_threads = 0;
261 for (int i = 0; i < num_threads_hint; i++) {
262 struct BThreadWorkDispatcher_thread *t = &o->threads[i];
263  
264 // set parent pointer
265 t->d = o;
266  
267 // set no running work
268 t->running_work = NULL;
269  
270 // init condition variable
271 if (pthread_cond_init(&t->new_cond, NULL) != 0) {
272 BLog(BLOG_ERROR, "pthread_cond_init failed");
273 goto fail3;
274 }
275  
276 // init thread
277 if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
278 BLog(BLOG_ERROR, "pthread_create failed");
279 ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
280 goto fail3;
281 }
282  
283 o->num_threads++;
284 }
285 }
286  
287 #endif
288  
289 DebugObject_Init(&o->d_obj);
290 DebugCounter_Init(&o->d_ctr);
291 return 1;
292  
293 #ifdef BADVPN_THREADWORK_USE_PTHREAD
294 fail3:
295 stop_threads(o);
296 BPending_Free(&o->more_job);
297 BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
298 fail2:
299 ASSERT_FORCE(close(o->pipe[0]) == 0)
300 ASSERT_FORCE(close(o->pipe[1]) == 0)
301 fail1:
302 ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
303 fail0:
304 return 0;
305 #endif
306 }
307  
308 void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
309 {
310 #ifdef BADVPN_THREADWORK_USE_PTHREAD
311 if (o->num_threads > 0) {
312 ASSERT(LinkedList1_IsEmpty(&o->pending_list))
313 for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
314 ASSERT(LinkedList1_IsEmpty(&o->finished_list))
315 }
316 #endif
317 DebugObject_Free(&o->d_obj);
318 DebugCounter_Free(&o->d_ctr);
319  
320 #ifdef BADVPN_THREADWORK_USE_PTHREAD
321  
322 if (o->num_threads > 0) {
323 // stop threads
324 stop_threads(o);
325  
326 // free more job
327 BPending_Free(&o->more_job);
328  
329 // free BFileDescriptor
330 BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
331  
332 // free pipe
333 ASSERT_FORCE(close(o->pipe[0]) == 0)
334 ASSERT_FORCE(close(o->pipe[1]) == 0)
335  
336 // free mutex
337 ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
338 }
339  
340 #endif
341 }
342  
343 int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
344 {
345 #ifdef BADVPN_THREADWORK_USE_PTHREAD
346 return (o->num_threads > 0);
347 #else
348 return 0;
349 #endif
350 }
351  
352 void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
353 {
354 DebugObject_Access(&d->d_obj);
355  
356 // init arguments
357 o->d = d;
358 o->handler_done = handler_done;
359 o->user = user;
360 o->work_func = work_func;
361 o->work_func_user = work_func_user;
362  
363 #ifdef BADVPN_THREADWORK_USE_PTHREAD
364 if (d->num_threads > 0) {
365 // set state
366 o->state = BTHREADWORK_STATE_PENDING;
367  
368 // init finished semaphore
369 ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
370  
371 // post work
372 ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
373 LinkedList1_Append(&d->pending_list, &o->list_node);
374 for (int i = 0; i < d->num_threads; i++) {
375 if (!d->threads[i].running_work) {
376 ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
377 break;
378 }
379 }
380 ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
381 } else {
382 #endif
383 // schedule job
384 BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
385 BPending_Set(&o->job);
386 #ifdef BADVPN_THREADWORK_USE_PTHREAD
387 }
388 #endif
389  
390 DebugObject_Init(&o->d_obj);
391 DebugCounter_Increment(&d->d_ctr);
392 }
393  
394 void BThreadWork_Free (BThreadWork *o)
395 {
396 BThreadWorkDispatcher *d = o->d;
397 DebugObject_Free(&o->d_obj);
398 DebugCounter_Decrement(&d->d_ctr);
399  
400 #ifdef BADVPN_THREADWORK_USE_PTHREAD
401 if (d->num_threads > 0) {
402 ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
403  
404 switch (o->state) {
405 case BTHREADWORK_STATE_PENDING: {
406 BLog(BLOG_DEBUG, "remove pending work");
407  
408 // remove from pending list
409 LinkedList1_Remove(&d->pending_list, &o->list_node);
410 } break;
411  
412 case BTHREADWORK_STATE_RUNNING: {
413 BLog(BLOG_DEBUG, "remove running work");
414  
415 // wait for the work to finish running
416 ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
417 ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
418 ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
419  
420 ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
421  
422 // remove from finished list
423 LinkedList1_Remove(&d->finished_list, &o->list_node);
424 } break;
425  
426 case BTHREADWORK_STATE_FINISHED: {
427 BLog(BLOG_DEBUG, "remove finished work");
428  
429 // remove from finished list
430 LinkedList1_Remove(&d->finished_list, &o->list_node);
431 } break;
432  
433 case BTHREADWORK_STATE_FORGOTTEN: {
434 BLog(BLOG_DEBUG, "remove forgotten work");
435 } break;
436  
437 default:
438 ASSERT(0);
439 }
440  
441 ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
442  
443 // free finished semaphore
444 ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
445 } else {
446 #endif
447 BPending_Free(&o->job);
448 #ifdef BADVPN_THREADWORK_USE_PTHREAD
449 }
450 #endif
451 }