BadVPN – Rev 1

Subversion Repositories:
Rev:
/**
 * @file BThreadWork.c
 * @author Ambroz Bizjak <ambrop7@gmail.com>
 * 
 * @section LICENSE
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the name of the author nor the
 *    names of its contributors may be used to endorse or promote products
 *    derived from this software without specific prior written permission.
 * 
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdint.h>
#include <stddef.h>

#ifdef BADVPN_THREADWORK_USE_PTHREAD
    #include <unistd.h>
    #include <errno.h>
    #include <fcntl.h>
#endif

#include <misc/offset.h>
#include <base/BLog.h>

#include <generated/blog_channel_BThreadWork.h>

#include <threadwork/BThreadWork.h>

#ifdef BADVPN_THREADWORK_USE_PTHREAD

static void * dispatcher_thread (struct BThreadWorkDispatcher_thread *t)
{
    BThreadWorkDispatcher *o = t->d;
    
    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
    
    while (1) {
        // exit if requested
        if (o->cancel) {
            break;
        }
        
        if (LinkedList1_IsEmpty(&o->pending_list)) {
            // wait for event
            ASSERT_FORCE(pthread_cond_wait(&t->new_cond, &o->mutex) == 0)
            continue;
        }
        
        // grab the work
        BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->pending_list), BThreadWork, list_node);
        ASSERT(w->state == BTHREADWORK_STATE_PENDING)
        LinkedList1_Remove(&o->pending_list, &w->list_node);
        t->running_work = w;
        w->state = BTHREADWORK_STATE_RUNNING;
        
        // do the work
        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
        w->work_func(w->work_func_user);
        ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
        
        // release the work
        t->running_work = NULL;
        LinkedList1_Append(&o->finished_list, &w->list_node);
        w->state = BTHREADWORK_STATE_FINISHED;
        ASSERT_FORCE(sem_post(&w->finished_sem) == 0)
        
        // write to pipe
        uint8_t b = 0;
        int res = write(o->pipe[1], &b, sizeof(b));
        if (res < 0) {
            int error = errno;
            ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
        }
    }
    
    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
    
    return NULL;
}

static void dispatch_job (BThreadWorkDispatcher *o)
{
    ASSERT(o->num_threads > 0)
    
    // lock
    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
    
    // check for finished job
    if (LinkedList1_IsEmpty(&o->finished_list)) {
        ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
        return;
    }
    
    // grab finished job
    BThreadWork *w = UPPER_OBJECT(LinkedList1_GetFirst(&o->finished_list), BThreadWork, list_node);
    ASSERT(w->state == BTHREADWORK_STATE_FINISHED)
    LinkedList1_Remove(&o->finished_list, &w->list_node);
    
    // schedule more
    if (!LinkedList1_IsEmpty(&o->finished_list)) {
        BPending_Set(&o->more_job);
    }
    
    // set state forgotten
    w->state = BTHREADWORK_STATE_FORGOTTEN;
    
    // unlock
    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
    
    // call handler
    w->handler_done(w->user);
    return;
}

static void pipe_fd_handler (BThreadWorkDispatcher *o, int events)
{
    ASSERT(o->num_threads > 0)
    DebugObject_Access(&o->d_obj);
    
    // read data from pipe
    uint8_t b[64];
    int res = read(o->pipe[0], b, sizeof(b));
    if (res < 0) {
        int error = errno;
        ASSERT_FORCE(error == EAGAIN || error == EWOULDBLOCK)
    } else {
        ASSERT(res > 0)
    }
    
    dispatch_job(o);
    return;
}

static void more_job_handler (BThreadWorkDispatcher *o)
{
    ASSERT(o->num_threads > 0)
    DebugObject_Access(&o->d_obj);
    
    dispatch_job(o);
    return;
}

static void stop_threads (BThreadWorkDispatcher *o)
{
    // set cancelling
    ASSERT_FORCE(pthread_mutex_lock(&o->mutex) == 0)
    o->cancel = 1;
    ASSERT_FORCE(pthread_mutex_unlock(&o->mutex) == 0)
    
    while (o->num_threads > 0) {
        struct BThreadWorkDispatcher_thread *t = &o->threads[o->num_threads - 1];
        
        // wake up thread
        ASSERT_FORCE(pthread_cond_signal(&t->new_cond) == 0)
        
        // wait for thread to exit
        ASSERT_FORCE(pthread_join(t->thread, NULL) == 0)
        
        // free condition variable
        ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
        
        o->num_threads--;
    }
}

#endif

static void work_job_handler (BThreadWork *o)
{
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    ASSERT(o->d->num_threads == 0)
    #endif
    DebugObject_Access(&o->d_obj);
    
    // do the work
    o->work_func(o->work_func_user);
    
    // call handler
    o->handler_done(o->user);
    return;
}

int BThreadWorkDispatcher_Init (BThreadWorkDispatcher *o, BReactor *reactor, int num_threads_hint)
{
    // init arguments
    o->reactor = reactor;
    
    if (num_threads_hint < 0) {
        num_threads_hint = 2;
    }
    if (num_threads_hint > BTHREADWORK_MAX_THREADS) {
        num_threads_hint = BTHREADWORK_MAX_THREADS;
    }
    
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    
    if (num_threads_hint > 0) {
        // init pending list
        LinkedList1_Init(&o->pending_list);
        
        // init finished list
        LinkedList1_Init(&o->finished_list);
        
        // init mutex
        if (pthread_mutex_init(&o->mutex, NULL) != 0) {
            BLog(BLOG_ERROR, "pthread_mutex_init failed");
            goto fail0;
        }
        
        // init pipe
        if (pipe(o->pipe) < 0) {
            BLog(BLOG_ERROR, "pipe failed");
            goto fail1;
        }
        
        // set read end non-blocking
        if (fcntl(o->pipe[0], F_SETFL, O_NONBLOCK) < 0) {
            BLog(BLOG_ERROR, "fcntl failed");
            goto fail2;
        }
        
        // set write end non-blocking
        if (fcntl(o->pipe[1], F_SETFL, O_NONBLOCK) < 0) {
            BLog(BLOG_ERROR, "fcntl failed");
            goto fail2;
        }
        
        // init BFileDescriptor
        BFileDescriptor_Init(&o->bfd, o->pipe[0], (BFileDescriptor_handler)pipe_fd_handler, o);
        if (!BReactor_AddFileDescriptor(o->reactor, &o->bfd)) {
            BLog(BLOG_ERROR, "BReactor_AddFileDescriptor failed");
            goto fail2;
        }
        BReactor_SetFileDescriptorEvents(o->reactor, &o->bfd, BREACTOR_READ);
        
        // init more job
        BPending_Init(&o->more_job, BReactor_PendingGroup(o->reactor), (BPending_handler)more_job_handler, o);
        
        // set not cancelling
        o->cancel = 0;
        
        // init threads
        o->num_threads = 0;
        for (int i = 0; i < num_threads_hint; i++) {
            struct BThreadWorkDispatcher_thread *t = &o->threads[i];
            
            // set parent pointer
            t->d = o;
            
            // set no running work
            t->running_work = NULL;
            
            // init condition variable
            if (pthread_cond_init(&t->new_cond, NULL) != 0) {
                BLog(BLOG_ERROR, "pthread_cond_init failed");
                goto fail3;
            }
            
            // init thread
            if (pthread_create(&t->thread, NULL, (void * (*) (void *))dispatcher_thread, t) != 0) {
                BLog(BLOG_ERROR, "pthread_create failed");
                ASSERT_FORCE(pthread_cond_destroy(&t->new_cond) == 0)
                goto fail3;
            }
        
            o->num_threads++;
        }
    }
    
    #endif
    
    DebugObject_Init(&o->d_obj);
    DebugCounter_Init(&o->d_ctr);
    return 1;
    
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
fail3:
    stop_threads(o);
    BPending_Free(&o->more_job);
    BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
fail2:
    ASSERT_FORCE(close(o->pipe[0]) == 0)
    ASSERT_FORCE(close(o->pipe[1]) == 0)
fail1:
    ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
fail0:
    return 0;
    #endif
}

void BThreadWorkDispatcher_Free (BThreadWorkDispatcher *o)
{
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    if (o->num_threads > 0) {
        ASSERT(LinkedList1_IsEmpty(&o->pending_list))
        for (int i = 0; i < o->num_threads; i++) { ASSERT(!o->threads[i].running_work) }
        ASSERT(LinkedList1_IsEmpty(&o->finished_list))
    }
    #endif
    DebugObject_Free(&o->d_obj);
    DebugCounter_Free(&o->d_ctr);
    
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    
    if (o->num_threads > 0) {
        // stop threads
        stop_threads(o);
        
        // free more job
        BPending_Free(&o->more_job);
        
        // free BFileDescriptor
        BReactor_RemoveFileDescriptor(o->reactor, &o->bfd);
        
        // free pipe
        ASSERT_FORCE(close(o->pipe[0]) == 0)
        ASSERT_FORCE(close(o->pipe[1]) == 0)
        
        // free mutex
        ASSERT_FORCE(pthread_mutex_destroy(&o->mutex) == 0)
    }
    
    #endif
}

int BThreadWorkDispatcher_UsingThreads (BThreadWorkDispatcher *o)
{
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    return (o->num_threads > 0);
    #else
    return 0;
    #endif
}

void BThreadWork_Init (BThreadWork *o, BThreadWorkDispatcher *d, BThreadWork_handler_done handler_done, void *user, BThreadWork_work_func work_func, void *work_func_user)
{
    DebugObject_Access(&d->d_obj);
    
    // init arguments
    o->d = d;
    o->handler_done = handler_done;
    o->user = user;
    o->work_func = work_func;
    o->work_func_user = work_func_user;
    
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    if (d->num_threads > 0) {
        // set state
        o->state = BTHREADWORK_STATE_PENDING;
        
        // init finished semaphore
        ASSERT_FORCE(sem_init(&o->finished_sem, 0, 0) == 0)
        
        // post work
        ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
        LinkedList1_Append(&d->pending_list, &o->list_node);
        for (int i = 0; i < d->num_threads; i++) {
            if (!d->threads[i].running_work) {
                ASSERT_FORCE(pthread_cond_signal(&d->threads[i].new_cond) == 0)
                break;
            }
        }
        ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
    } else {
    #endif
        // schedule job
        BPending_Init(&o->job, BReactor_PendingGroup(d->reactor), (BPending_handler)work_job_handler, o);
        BPending_Set(&o->job);
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    }
    #endif
    
    DebugObject_Init(&o->d_obj);
    DebugCounter_Increment(&d->d_ctr);
}

void BThreadWork_Free (BThreadWork *o)
{
    BThreadWorkDispatcher *d = o->d;
    DebugObject_Free(&o->d_obj);
    DebugCounter_Decrement(&d->d_ctr);
    
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    if (d->num_threads > 0) {
        ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
        
        switch (o->state) {
            case BTHREADWORK_STATE_PENDING: {
                BLog(BLOG_DEBUG, "remove pending work");
                
                // remove from pending list
                LinkedList1_Remove(&d->pending_list, &o->list_node);
            } break;
            
            case BTHREADWORK_STATE_RUNNING: {
                BLog(BLOG_DEBUG, "remove running work");
                
                // wait for the work to finish running
                ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
                ASSERT_FORCE(sem_wait(&o->finished_sem) == 0)
                ASSERT_FORCE(pthread_mutex_lock(&d->mutex) == 0)
                
                ASSERT(o->state == BTHREADWORK_STATE_FINISHED)
                
                // remove from finished list
                LinkedList1_Remove(&d->finished_list, &o->list_node);
            } break;
            
            case BTHREADWORK_STATE_FINISHED: {
                BLog(BLOG_DEBUG, "remove finished work");
                
                // remove from finished list
                LinkedList1_Remove(&d->finished_list, &o->list_node);
            } break;
            
            case BTHREADWORK_STATE_FORGOTTEN: {
                BLog(BLOG_DEBUG, "remove forgotten work");
            } break;
            
            default:
                ASSERT(0);
        }
        
        ASSERT_FORCE(pthread_mutex_unlock(&d->mutex) == 0)
        
        // free finished semaphore
        ASSERT_FORCE(sem_destroy(&o->finished_sem) == 0)
    } else {
    #endif
        BPending_Free(&o->job);
    #ifdef BADVPN_THREADWORK_USE_PTHREAD
    }
    #endif
}