From b9771b85d4f543af78465985e6350c0ca57f4c70 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Thu, 11 Jun 2020 23:24:02 +0200 Subject: Broken Mess --- 04_exercise/threadpool.c | 94 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 27 deletions(-) (limited to '04_exercise/threadpool.c') diff --git a/04_exercise/threadpool.c b/04_exercise/threadpool.c index 93b8807..54d4448 100644 --- a/04_exercise/threadpool.c +++ b/04_exercise/threadpool.c @@ -3,13 +3,13 @@ #include "array.h" #include +#include #include #include #include #include -#include -#define MAX_FUTURES 2048 +#define MAX_FUTURES 20 typedef struct Thread { pthread_t pthread; size_t index; @@ -19,30 +19,72 @@ typedef struct ThreadPool { /* TODO: benötigte Attribute hinzufügen */ size_t size; Thread *threads; - Node arena [MAX_FUTURES]; - ArenaList al; - + Node arena[MAX_FUTURES]; + AtomicArenaList al; } ThreadPool; ThreadPool threadPool; +bool isFutureWaiting(void const *vFuture) { + Future *future = (Future *)vFuture; + FutureStatus status = atomic_load(&future->status); + return status == FUT_WAITING; +} + +void tryRunningFuture(Future *future) { + char expected = FUT_WAITING; + if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { + return; + } + + future->fn(future); + expected = FUT_IN_PROGRESS; + if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) { + perror("The future got marked as done by another thread"); + exit(-1); + } +} /* TODO: interne Hilfsfunktionen hinzufügen */ -void * poolWorkerFunc(void * v_index) { - size_t index = * (size_t *) v_index; + +bool tryDoingWork() { + Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting); + if (future == NULL) { + return false; + } + // This will just return if some other thread was first as well + tryRunningFuture(future); + return true; +} +/** @brief worker function running in the threadpool + */ +_Noreturn void *poolWorkerFunc(void *v_index) { + size_t index = *(size_t *)v_index; printf("Thread %zu started \n", index); - return NULL; + unsigned int backoffTime = 1; + while (true) { + if (!tryDoingWork()) { + backoffTime = 2 * backoffTime; + fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n", + index, backoffTime); + sleep(backoffTime); + continue; + } else { + backoffTime = 1; + } + } } int tpInit(size_t size) { threadPool.size = size; arrayInit(threadPool.threads); - alInit(threadPool.arena, MAX_FUTURES); + threadPool.al = alInit(threadPool.arena, MAX_FUTURES); for (size_t i = 0; i < size; ++i) { arrayPush(threadPool.threads); int status; threadPool.threads[i].index = i; if ((status = pthread_create(&threadPool.threads[i].pthread, NULL, - poolWorkerFunc, &threadPool.threads[i].index)) < 0) { + poolWorkerFunc, &threadPool.threads[i].index)) < + 0) { return status; } } @@ -52,7 +94,8 @@ int tpInit(size_t size) { void tpRelease(void) { for (size_t i = 0; i < threadPool.size; ++i) { if (pthread_cancel(threadPool.threads[i].pthread) != 0) { - fprintf(stderr, "The thread %zu had already exited. \n", threadPool.threads[i].index); + fprintf(stderr, "The thread %zu had already exited. \n", + threadPool.threads[i].index); } } for (size_t i = 0; i < threadPool.size; ++i) { @@ -62,30 +105,27 @@ void tpRelease(void) { exit(status); } } + arrayRelease(threadPool.threads); } void tpAsync(Future *future) { - alPush(&threadPool.al, (void *) future); - char expected = FUT_WAITING; - if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS )) { - perror("The future has been scheduled before"); - exit(-1); - } - - future->fn(future); - expected = FUT_IN_PROGRESS; - if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { - perror("The future got marked as done by another thread"); + if (alPush(&threadPool.al, (void *)future) < -1) { + perror("Push failed"); exit(-1); } - } void tpAwait(Future *future) { - if(atomic_load(&future->status) == FUT_DONE) { - alRemoveElem(&threadPool.al, (void *) future); + if (atomic_load(&future->status) == FUT_DONE) { + alRemoveElem(&threadPool.al, (void *)future); return; } -} - + if (atomic_load(&future->status) == FUT_WAITING) { + tryRunningFuture(future); + } + while (atomic_load(&future->status) != FUT_DONE) { + tryDoingWork(); + } + alRemoveElem(&threadPool.al, (void *)future); +} -- cgit v1.2.3-54-g00ecf