From b9771b85d4f543af78465985e6350c0ca57f4c70 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Thu, 11 Jun 2020 23:24:02 +0200 Subject: Broken Mess --- 04_exercise/CMakeLists.txt | 8 ++- 04_exercise/arena/arena_list.c | 123 ++++++++++++++++++++++++++++------------- 04_exercise/arena/arena_list.h | 32 ++++++++--- 04_exercise/arena/arena_test.c | 14 ++++- 04_exercise/main.c | 31 ++++++----- 04_exercise/threadpool.c | 94 ++++++++++++++++++++++--------- 04_exercise/threadpool.h | 4 +- 7 files changed, 213 insertions(+), 93 deletions(-) diff --git a/04_exercise/CMakeLists.txt b/04_exercise/CMakeLists.txt index d8d1200..8cea647 100644 --- a/04_exercise/CMakeLists.txt +++ b/04_exercise/CMakeLists.txt @@ -10,13 +10,17 @@ target_link_libraries(threadpool INTERFACE warnings) add_executable(fibonacci main.c) -target_compile_options(fibonacci INTERFACE ${PROJECT_WARNINGS}) target_link_libraries(fibonacci PRIVATE threadpool) target_link_libraries(fibonacci INTERFACE warnings) add_library(arena_list arena/arena_list.c) target_include_directories(arena_list PUBLIC arena) target_link_libraries(arena_list INTERFACE warnings) +target_link_libraries(arena_list PRIVATE rwlock) add_executable(arena_test arena/arena_test.c) -target_link_libraries(arena_test PRIVATE arena_list) \ No newline at end of file +target_link_libraries(arena_test PRIVATE arena_list) + +add_library(rwlock rwlock/rwlock.c) +target_include_directories(rwlock PUBLIC rwlock) +target_link_libraries(rwlock INTERFACE warnings) diff --git a/04_exercise/arena/arena_list.c b/04_exercise/arena/arena_list.c index 822f8f0..4becef6 100644 --- a/04_exercise/arena/arena_list.c +++ b/04_exercise/arena/arena_list.c @@ -2,6 +2,12 @@ // Created by stefan on 10.06.20. // #include "arena_list.h" +#include +#include +#include +#include + +#include Node *listPopFront(List *list) { Node *front = list->first; @@ -16,14 +22,30 @@ Node *listPopFront(List *list) { return front; } - if (front->next != NULL) { - list->first->prev = NULL; - front->next = NULL; - } + list->first->prev = NULL; + front->next = NULL; + return front; } -Node *listPopBack(List* list) { return NULL; } +Node *listPopBack(List *list) { + Node *back = list->last; + if (back == NULL) { + return NULL; + } + + list->last = back->prev; + if (back == list->first) { + // Only Node in the list + list->last = NULL; + return back; + } + + list->last->next = NULL; + back->prev = NULL; + + return back; +} void listPushFront(List *list, Node *new) { if (list->first == NULL) { @@ -38,14 +60,14 @@ void listPushFront(List *list, Node *new) { list->first = new; } -void listPushBack(List* list, Node *value) {} +// void listPushBack(List *list, Node *value) {} -ArenaList alInit(Node *arena, size_t size) { - ArenaList al; - al.activeList = (List){.first = NULL, .last = NULL}; - al.freeList = (List){.first = arena, .last = &arena[size - 1]}; - al.arena = arena; - al.size = size; +AtomicArenaList alInit(Node *arena, size_t size) { + AtomicArenaList al = {.activeList = (List){.first = NULL, .last = NULL}, + .freeList = (List){.first = arena, .last = &arena[size - 1]}, + .arena = arena, + .size = size}; + atomic_init(&al.lock, RW_UNLOCKED); arena[0] = (Node){.value = NULL, .prev = NULL, .next = &arena[1]}; for (size_t i = 1; i < size - 1; ++i) { arena[i] = (Node){.value = NULL, .prev = &arena[i - 1], .next = &arena[i + 1]}; @@ -54,48 +76,73 @@ ArenaList alInit(Node *arena, size_t size) { return al; } -int alPush(ArenaList* al, void *value) { +int alPush(AtomicArenaList *al, void *value) { + fprintf(stderr, "Thread %lu: Pushing \n", pthread_self() % 1000); + rwLockWrite(&al->lock); Node *current = listPopFront(&al->freeList); // List is empty if (current == NULL) { + rwUnlockWrite(&al->lock); return -1; } current->value = value; listPushFront(&al->activeList, current); + rwUnlockWrite(&al->lock); return 0; } +int alRemoveElem(AtomicArenaList *al, void *value) { + fprintf(stderr, "Thread %lu: Removing element \n", pthread_self() % 1000); + rwLockWrite(&al->lock); + for (size_t i = 0; i < al->size; ++i) { + if (al->arena[i].value != value) { + continue; + } + Node *node = &al->arena[i]; + if (node == al->activeList.first) { + listPopFront(&al->activeList); + listPushFront(&al->freeList, node); + atomic_char *a = &al->lock; + rwUnlockWrite(a); + return 0; + } + if (node == al->activeList.last) { + listPopBack(&al->activeList); + listPushFront(&al->freeList, node); + rwUnlockWrite(&al->lock); + return 0; + } + // The node is somewhere in the middle + Node *prev = node->prev; + Node *next = node->next; + next->prev = prev; + prev->next = next; -int alRemoveElem(ArenaList* al, void * value){ - for(size_t i = 0; i < al->size; ++i) { - if(al->arena[i].value == value) { - return alRemove(al, &al->arena[i]); - } + listPushFront(&al->freeList, node); + rwUnlockWrite(&al->lock); + return 0; } + rwUnlockWrite(&al->lock); return -1; } -int alRemove(ArenaList* al, Node * node) { - //TODO Should we check that the node is actually in the active list - //Maybe as an assert that gets optimized out - if(node == al->activeList.first) { - listPopFront(&al->activeList); - listPushFront(&al->freeList, node); - return 0; - } - if(node == al->activeList.last) { - listPopBack(&al->activeList); - listPushFront(&al->freeList, node); - return 0; - } - // The node is somewhere in the middle - Node * prev = node ->prev; - Node * next = node ->next; +void *alFindLastElem(AtomicArenaList *al, SearchFunction f) { + fprintf(stderr, "Thread %lu: Finding last element \n", pthread_self() % 1000); - next->prev = prev; - prev->next = next; + rwLockRead(&al->lock); - listPushFront(&al->freeList, node); - return 0; + List const *const actList = &al->activeList; + if (actList->last == NULL) { + rwUnlockRead(&al->lock); + return NULL; + } + for (Node *current = actList->last; current != NULL; current = current->prev) { + if (f(current->value)) { + rwUnlockRead(&al->lock); + return current; + } + } + rwUnlockRead(&al->lock); + return NULL; } diff --git a/04_exercise/arena/arena_list.h b/04_exercise/arena/arena_list.h index 387f881..304209c 100644 --- a/04_exercise/arena/arena_list.h +++ b/04_exercise/arena/arena_list.h @@ -6,6 +6,9 @@ #define BETRIEBSYSTEME_ARENA_LIST_H #include +#include +#include + typedef struct Node { void * value; struct Node * next; @@ -16,24 +19,39 @@ typedef struct List { Node * first; Node * last; } List; -// This structure manages an arena / memory slab to be used + + // Should we have a free list or just a bit set relative to the size of the slab? +/** This structure manages an arena / memory slab to be used + * This structure is thread-safe as it locks internally + * It will spin until a request is safe to access the items + * This also means this structure shouldn't be read externally + * unless the thread doing so managed to aquire the atomic_flag + */ typedef struct ArenaList { List freeList; List activeList; Node * arena; size_t size; -} ArenaList; + atomic_char lock; +} AtomicArenaList; +typedef bool (*SearchFunction)(void const *); //Initializes the node array -ArenaList alInit(Node * arena, size_t size); +AtomicArenaList alInit(Node * arena, size_t size); // Return -1 if the List is full -int alPush(ArenaList* al, void * value); +int alPush(AtomicArenaList * al, void * value); // Return -1 if the Node was already deleted -int alRemove(ArenaList* al, Node * node); +int alRemoveElem(AtomicArenaList * al, void * value); -// Return -1 if the Node was already deleted -int alRemoveElem(ArenaList* al, void * value); +/** + * Searches the activeList for an element + * for which the search function returns true + * The function will be passed the value pointer + * of a given Node + * @return the Element pointed or NULL if nothing matched + */ +void *alFindLastElem(AtomicArenaList *al, SearchFunction f); #endif // BETRIEBSYSTEME_ARENA_LIST_H diff --git a/04_exercise/arena/arena_test.c b/04_exercise/arena/arena_test.c index b0927f1..51a9b0c 100644 --- a/04_exercise/arena/arena_test.c +++ b/04_exercise/arena/arena_test.c @@ -3,9 +3,14 @@ // #include "arena_list.h" #include +#include +bool isEqualTo3(void const *data) { + int *value = (void *)data; + return *value == 3; +} int main() { Node arena[5]; - ArenaList al = alInit(arena, 5); + AtomicArenaList al = alInit(arena, 5); int data[5] = {1, 2, 3, 4, 5}; for (int i = 0; i < 5; ++i) { alPush(&al, &data[4 - i]); @@ -14,5 +19,12 @@ int main() { for (Node *cur = al.activeList.first; cur != NULL; cur = cur->next) { printf("Got digit %d \n", *(int *)cur->value); } + Node const * node = alFindLastElem(&al, &isEqualTo3); + int * value = (int *) node->value; + printf("The value was actually %d \n", *value); + assert(*value == 3); + for (int i = 0; i < 5; ++i) { + alRemoveElem(&al, &data[4 - i]); + } } diff --git a/04_exercise/main.c b/04_exercise/main.c index 18fb21e..e73dd98 100644 --- a/04_exercise/main.c +++ b/04_exercise/main.c @@ -5,22 +5,23 @@ static TASK(long, fib, long) -long fib(long n) { - if (n <= 1) - return n; - - fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) }); - fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) }); - - return fibAwait(a) + fibAwait(b); + long fib(long n) { + if (n <= 1) + return n; + + fib_fut *a = fibAsync((fib_fut[]){fibFuture(n - 1)}); + fib_fut *b = fibAsync((fib_fut[]){fibFuture(n - 2)}); + + return fibAwait(a) + fibAwait(b); } int main() { - if (tpInit(8) != 0) - perror("Thread Pool initialization failed"), exit(-1); - atexit(&tpRelease); - for (long i = 0; i <= 20; ++i) - printf("fib(%2li) = %li\n", i, fib(i)); - - return 0; + setvbuf(stdout, NULL, _IONBF, 0); + if (tpInit(1) != 0) + perror("Thread Pool initialization failed"), exit(-1); + atexit(&tpRelease); + for (long i = 0; i <= 20; ++i) + printf("fib(%2li) = %li\n", i, fib(i)); + + return 0; } diff --git a/04_exercise/threadpool.c b/04_exercise/threadpool.c index 93b8807..54d4448 100644 --- a/04_exercise/threadpool.c +++ b/04_exercise/threadpool.c @@ -3,13 +3,13 @@ #include "array.h" #include +#include #include #include #include #include -#include -#define MAX_FUTURES 2048 +#define MAX_FUTURES 20 typedef struct Thread { pthread_t pthread; size_t index; @@ -19,30 +19,72 @@ typedef struct ThreadPool { /* TODO: benötigte Attribute hinzufügen */ size_t size; Thread *threads; - Node arena [MAX_FUTURES]; - ArenaList al; - + Node arena[MAX_FUTURES]; + AtomicArenaList al; } ThreadPool; ThreadPool threadPool; +bool isFutureWaiting(void const *vFuture) { + Future *future = (Future *)vFuture; + FutureStatus status = atomic_load(&future->status); + return status == FUT_WAITING; +} + +void tryRunningFuture(Future *future) { + char expected = FUT_WAITING; + if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { + return; + } + + future->fn(future); + expected = FUT_IN_PROGRESS; + if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) { + perror("The future got marked as done by another thread"); + exit(-1); + } +} /* TODO: interne Hilfsfunktionen hinzufügen */ -void * poolWorkerFunc(void * v_index) { - size_t index = * (size_t *) v_index; + +bool tryDoingWork() { + Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting); + if (future == NULL) { + return false; + } + // This will just return if some other thread was first as well + tryRunningFuture(future); + return true; +} +/** @brief worker function running in the threadpool + */ +_Noreturn void *poolWorkerFunc(void *v_index) { + size_t index = *(size_t *)v_index; printf("Thread %zu started \n", index); - return NULL; + unsigned int backoffTime = 1; + while (true) { + if (!tryDoingWork()) { + backoffTime = 2 * backoffTime; + fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n", + index, backoffTime); + sleep(backoffTime); + continue; + } else { + backoffTime = 1; + } + } } int tpInit(size_t size) { threadPool.size = size; arrayInit(threadPool.threads); - alInit(threadPool.arena, MAX_FUTURES); + threadPool.al = alInit(threadPool.arena, MAX_FUTURES); for (size_t i = 0; i < size; ++i) { arrayPush(threadPool.threads); int status; threadPool.threads[i].index = i; if ((status = pthread_create(&threadPool.threads[i].pthread, NULL, - poolWorkerFunc, &threadPool.threads[i].index)) < 0) { + poolWorkerFunc, &threadPool.threads[i].index)) < + 0) { return status; } } @@ -52,7 +94,8 @@ int tpInit(size_t size) { void tpRelease(void) { for (size_t i = 0; i < threadPool.size; ++i) { if (pthread_cancel(threadPool.threads[i].pthread) != 0) { - fprintf(stderr, "The thread %zu had already exited. \n", threadPool.threads[i].index); + fprintf(stderr, "The thread %zu had already exited. \n", + threadPool.threads[i].index); } } for (size_t i = 0; i < threadPool.size; ++i) { @@ -62,30 +105,27 @@ void tpRelease(void) { exit(status); } } + arrayRelease(threadPool.threads); } void tpAsync(Future *future) { - alPush(&threadPool.al, (void *) future); - char expected = FUT_WAITING; - if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS )) { - perror("The future has been scheduled before"); - exit(-1); - } - - future->fn(future); - expected = FUT_IN_PROGRESS; - if(!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) { - perror("The future got marked as done by another thread"); + if (alPush(&threadPool.al, (void *)future) < -1) { + perror("Push failed"); exit(-1); } - } void tpAwait(Future *future) { - if(atomic_load(&future->status) == FUT_DONE) { - alRemoveElem(&threadPool.al, (void *) future); + if (atomic_load(&future->status) == FUT_DONE) { + alRemoveElem(&threadPool.al, (void *)future); return; } -} - + if (atomic_load(&future->status) == FUT_WAITING) { + tryRunningFuture(future); + } + while (atomic_load(&future->status) != FUT_DONE) { + tryDoingWork(); + } + alRemoveElem(&threadPool.al, (void *)future); +} diff --git a/04_exercise/threadpool.h b/04_exercise/threadpool.h index 1b5d64d..f73e36c 100644 --- a/04_exercise/threadpool.h +++ b/04_exercise/threadpool.h @@ -4,8 +4,6 @@ #include #include - - /**@brief Funktionszeiger auf eine asynchron auszuführende Funktion. * * Der Parameter kann zur Übergabe von Argumenten und den Rückgabewerten @@ -108,7 +106,7 @@ extern void tpAwait(Future *future); } \ static inline NAME ## _fut NAME ## Future(ARG arg) { \ return (NAME ## _fut) { \ - .fut = { .fn = &NAME ## Thunk }, \ + .fut = { .fn = &NAME ## Thunk, .status=FUT_WAITING}, \ .arg = arg \ }; \ } \ -- cgit v1.2.3-54-g00ecf