#include "threadpool.h"
#include "arena_list.h"
#include "array.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define MAX_FUTURES 20
typedef struct Thread {
pthread_t pthread;
size_t index;
} Thread;
typedef struct ThreadPool {
/* TODO: benötigte Attribute hinzufügen */
size_t size;
Thread *threads;
Node arena[MAX_FUTURES];
AtomicArenaList al;
} ThreadPool;
ThreadPool threadPool;
bool isFutureWaiting(void const *vFuture) {
Future *future = (Future *)vFuture;
FutureStatus status = atomic_load(&future->status);
return status == FUT_WAITING;
}
void tryRunningFuture(Future *future) {
char expected = FUT_WAITING;
if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) {
return;
}
future->fn(future);
expected = FUT_IN_PROGRESS;
if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) {
perror("The future got marked as done by another thread");
exit(-1);
}
}
/* TODO: interne Hilfsfunktionen hinzufügen */
bool tryDoingWork() {
Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting);
if (future == NULL) {
return false;
}
// This will just return if some other thread was first as well
tryRunningFuture(future);
return true;
}
/** @brief worker function running in the threadpool
*/
_Noreturn void *poolWorkerFunc(void *v_index) {
size_t index = *(size_t *)v_index;
printf("Thread %zu started \n", index);
unsigned int backoffTime = 1;
while (true) {
if (!tryDoingWork()) {
backoffTime = 2 * backoffTime;
fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n",
index, backoffTime);
sleep(backoffTime);
continue;
} else {
backoffTime = 1;
}
}
}
int tpInit(size_t size) {
threadPool.size = size;
arrayInit(threadPool.threads);
threadPool.al = alInit(threadPool.arena, MAX_FUTURES);
for (size_t i = 0; i < size; ++i) {
arrayPush(threadPool.threads);
int status;
threadPool.threads[i].index = i;
if ((status = pthread_create(&threadPool.threads[i].pthread, NULL,
poolWorkerFunc, &threadPool.threads[i].index)) <
0) {
return status;
}
}
return 0;
}
void tpRelease(void) {
for (size_t i = 0; i < threadPool.size; ++i) {
if (pthread_cancel(threadPool.threads[i].pthread) != 0) {
fprintf(stderr, "The thread %zu had already exited. \n",
threadPool.threads[i].index);
}
}
for (size_t i = 0; i < threadPool.size; ++i) {
int status;
if ((status = pthread_join(threadPool.threads[i].pthread, NULL)) != 0) {
perror("An error occured while joining the threads");
exit(status);
}
}
arrayRelease(threadPool.threads);
}
void tpAsync(Future *future) {
if (alPush(&threadPool.al, (void *)future) < -1) {
perror("Push failed");
exit(-1);
}
}
void tpAwait(Future *future) {
if (atomic_load(&future->status) == FUT_WAITING) {
tryRunningFuture(future);
}
while (atomic_load(&future->status) != FUT_DONE) {
tryDoingWork();
}
alRemoveElem(&threadPool.al, (void *)future);
}