summaryrefslogtreecommitdiffstats
path: root/04_exercise/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to '04_exercise/threadpool.c')
-rw-r--r--04_exercise/threadpool.c94
1 files changed, 67 insertions, 27 deletions
diff --git a/04_exercise/threadpool.c b/04_exercise/threadpool.c
index 93b8807..54d4448 100644
--- a/04_exercise/threadpool.c
+++ b/04_exercise/threadpool.c
@@ -3,13 +3,13 @@
#include "array.h"
#include <pthread.h>
+#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <stdatomic.h>
-#define MAX_FUTURES 2048
+#define MAX_FUTURES 20
typedef struct Thread {
pthread_t pthread;
size_t index;
@@ -19,30 +19,72 @@ typedef struct ThreadPool {
/* TODO: benötigte Attribute hinzufügen */
size_t size;
Thread *threads;
- Node arena [MAX_FUTURES];
- ArenaList al;
-
+ Node arena[MAX_FUTURES];
+ AtomicArenaList al;
} ThreadPool;
ThreadPool threadPool;
+bool isFutureWaiting(void const *vFuture) {
+ Future *future = (Future *)vFuture;
+ FutureStatus status = atomic_load(&future->status);
+ return status == FUT_WAITING;
+}
+
+void tryRunningFuture(Future *future) {
+ char expected = FUT_WAITING;
+ if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) {
+ return;
+ }
+
+ future->fn(future);
+ expected = FUT_IN_PROGRESS;
+ if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) {
+ perror("The future got marked as done by another thread");
+ exit(-1);
+ }
+}
/* TODO: interne Hilfsfunktionen hinzufügen */
-void * poolWorkerFunc(void * v_index) {
- size_t index = * (size_t *) v_index;
+
+bool tryDoingWork() {
+ Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting);
+ if (future == NULL) {
+ return false;
+ }
+ // This will just return if some other thread was first as well
+ tryRunningFuture(future);
+ return true;
+}
+/** @brief worker function running in the threadpool
+ */
+_Noreturn void *poolWorkerFunc(void *v_index) {
+ size_t index = *(size_t *)v_index;
printf("Thread %zu started \n", index);
- return NULL;
+ unsigned int backoffTime = 1;
+ while (true) {
+ if (!tryDoingWork()) {
+ backoffTime = 2 * backoffTime;
+ fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n",
+ index, backoffTime);
+ sleep(backoffTime);
+ continue;
+ } else {
+ backoffTime = 1;
+ }
+ }
}
int tpInit(size_t size) {
threadPool.size = size;
arrayInit(threadPool.threads);
- alInit(threadPool.arena, MAX_FUTURES);
+ threadPool.al = alInit(threadPool.arena, MAX_FUTURES);
for (size_t i = 0; i < size; ++i) {
arrayPush(threadPool.threads);
int status;
threadPool.threads[i].index = i;
if ((status = pthread_create(&threadPool.threads[i].pthread, NULL,
- poolWorkerFunc, &threadPool.threads[i].index)) < 0) {
+ poolWorkerFunc, &threadPool.threads[i].index)) <
+ 0) {
return status;
}
}
@@ -52,7 +94,8 @@ int tpInit(size_t size) {
void tpRelease(void) {
for (size_t i = 0; i < threadPool.size; ++i) {
if (pthread_cancel(threadPool.threads[i].pthread) != 0) {
- fprintf(stderr, "The thread %zu had already exited. \n", threadPool.threads[i].index);
+ fprintf(stderr, "The thread %zu had already exited. \n",
+ threadPool.threads[i].index);
}
}
for (size_t i = 0; i < threadPool.size; ++i) {
@@ -62,30 +105,27 @@ void tpRelease(void) {
exit(status);
}
}
+ arrayRelease(threadPool.threads);
}
void tpAsync(Future *future) {
- alPush(&threadPool.al, (void *) future);
- char expected = FUT_WAITING;
- if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS )) {
- perror("The future has been scheduled before");
- exit(-1);
- }
-
- future->fn(future);
- expected = FUT_IN_PROGRESS;
- if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) {
- perror("The future got marked as done by another thread");
+ if (alPush(&threadPool.al, (void *)future) < -1) {
+ perror("Push failed");
exit(-1);
}
-
}
void tpAwait(Future *future) {
- if(atomic_load(&future->status) == FUT_DONE) {
- alRemoveElem(&threadPool.al, (void *) future);
+ if (atomic_load(&future->status) == FUT_DONE) {
+ alRemoveElem(&threadPool.al, (void *)future);
return;
}
-}
-
+ if (atomic_load(&future->status) == FUT_WAITING) {
+ tryRunningFuture(future);
+ }
+ while (atomic_load(&future->status) != FUT_DONE) {
+ tryDoingWork();
+ }
+ alRemoveElem(&threadPool.al, (void *)future);
+}