From 021afcb8e4aa4fbc7905f527089c415108c86c75 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Thu, 18 Jun 2020 15:04:54 +0200 Subject: Switched to slotmap --- 04_exercise/threadpool.c | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) (limited to '04_exercise/threadpool.c') diff --git a/04_exercise/threadpool.c b/04_exercise/threadpool.c index d4ee8bb..9d73013 100644 --- a/04_exercise/threadpool.c +++ b/04_exercise/threadpool.c @@ -1,8 +1,8 @@ #include "threadpool.h" -#include "arena_list.h" #include "array.h" #include +#include #include #include #include @@ -19,8 +19,8 @@ typedef struct ThreadPool { /* TODO: benötigte Attribute hinzufügen */ size_t size; Thread *threads; - Node arena[MAX_FUTURES]; - AtomicArenaList al; + smEntry slab [MAX_FUTURES]; + smHeader header; } ThreadPool; ThreadPool threadPool; @@ -31,31 +31,32 @@ bool isFutureWaiting(void const *vFuture) { //This is racy but we'll CAS it later to make sure it's safe return status == FUT_WAITING; } - -void tryRunningFuture(Future *future) { +//TODO Document the behaviour of this function +bool tryRunningFuture(Future * const future) { char expected = FUT_WAITING; // This can happen if multiple threads found the same Future if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { - return; + return false; } - + smDeleteValue(& threadPool.header, future); future->fn(future); - expected = FUT_IN_PROGRESS; - if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) { + + if (atomic_exchange(&future->status, FUT_DONE) != FUT_IN_PROGRESS) { perror("The future got marked as done by another thread"); exit(-1); } + return true; } /* TODO: interne Hilfsfunktionen hinzufügen */ bool tryDoingWork() { - Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting); - if (future == NULL) { + smEntry *entry = smFindEntry(&threadPool.header, &isFutureWaiting); + if (entry == NULL) { return false; } + Future * fut = (Future *) entry->value; // This will just return if some other thread was first - tryRunningFuture(future); - return true; + return tryRunningFuture(fut); } /** @brief worker function running in the threadpool */ @@ -78,10 +79,9 @@ _Noreturn void *poolWorkerFunc(void *v_index) { int tpInit(size_t size) { threadPool.size = size; - arrayInit(threadPool.threads); - threadPool.al = alInit(threadPool.arena, MAX_FUTURES); + threadPool.threads = malloc(sizeof(Thread)*size); + threadPool.header = smInit((smEntry *)&threadPool.slab, MAX_FUTURES); for (size_t i = 0; i < size; ++i) { - arrayPush(threadPool.threads); int status; threadPool.threads[i].index = i; if ((status = pthread_create(&threadPool.threads[i].pthread, NULL, @@ -107,13 +107,12 @@ void tpRelease(void) { exit(status); } } - arrayRelease(threadPool.threads); + free(threadPool.threads); } void tpAsync(Future *future) { - if (alPush(&threadPool.al, (void *)future) < -1) { - perror("Push failed"); - exit(-1); + while (smInsert(&threadPool.header, (void *)future) < 0) { + tryDoingWork(); } } @@ -125,5 +124,4 @@ void tpAwait(Future *future) { while (atomic_load(&future->status) != FUT_DONE) { tryDoingWork(); } - alRemoveElem(&threadPool.al, (void *)future); } -- cgit v1.2.3-54-g00ecf