#include "threadpool.h" #include "array.h" #include "slotmap.h" #include #include #include #include #include #include #define MAX_FUTURES 1024 typedef struct Thread { pthread_t pthread; size_t index; } Thread; typedef struct ThreadPool { /* TODO: benötigte Attribute hinzufügen */ size_t size; Thread *threads; smEntry slab [MAX_FUTURES]; smHeader header; } ThreadPool; ThreadPool threadPool; bool isFutureWaiting(void const *vFuture) { Future *future = (Future *)vFuture; FutureStatus status = atomic_load(&future->status); //This is racy but we'll CAS it later to make sure it's safe return status == FUT_WAITING; } //TODO Document the behaviour of this function bool tryRunningFuture(Future * const future) { char expected = FUT_WAITING; // This can happen if multiple threads found the same Future if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { return false; } smDeleteValue(& threadPool.header, future); future->fn(future); if (atomic_exchange(&future->status, FUT_DONE) != FUT_IN_PROGRESS) { perror("The future got marked as done by another thread"); exit(-1); } return true; } /* TODO: interne Hilfsfunktionen hinzufügen */ bool tryDoingWork() { smEntry *entry = smFindEntry(&threadPool.header, &isFutureWaiting); if (entry == NULL) { return false; } Future * fut = (Future *) entry->value; // This will just return if some other thread was first return tryRunningFuture(fut); } /** @brief worker function running in the threadpool */ _Noreturn void *poolWorkerFunc(void *v_index) { size_t index = *(size_t *)v_index; printf("Thread %zu started \n", index); unsigned int backoffTime = 1; while (true) { if (!tryDoingWork()) { backoffTime = 2 * backoffTime; fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n", index, backoffTime); sleep(backoffTime); continue; } else { backoffTime = 1; } } } int tpInit(size_t size) { threadPool.size = size; threadPool.threads = malloc(sizeof(Thread)*size); threadPool.header = smInit((smEntry *)&threadPool.slab, MAX_FUTURES); for (size_t i = 0; i < size; ++i) { int status; threadPool.threads[i].index = i; if ((status = pthread_create(&threadPool.threads[i].pthread, NULL, poolWorkerFunc, &threadPool.threads[i].index)) < 0) { return status; } } return 0; } void tpRelease(void) { for (size_t i = 0; i < threadPool.size; ++i) { if (pthread_cancel(threadPool.threads[i].pthread) != 0) { fprintf(stderr, "The thread %zu had already exited. \n", threadPool.threads[i].index); } } for (size_t i = 0; i < threadPool.size; ++i) { int status; if ((status = pthread_join(threadPool.threads[i].pthread, NULL)) != 0) { perror("An error occured while joining the threads"); exit(status); } } free(threadPool.threads); } void tpAsync(Future *future) { while (smInsert(&threadPool.header, (void *)future) < 0) { tryDoingWork(); } } void tpAwait(Future *future) { if (atomic_load(&future->status) == FUT_WAITING) { tryRunningFuture(future); } while (atomic_load(&future->status) != FUT_DONE) { tryDoingWork(); } }