summaryrefslogtreecommitdiffstats
path: root/04_exercise/threadpool.c
blob: 54d44481519896e41946779aac8c18081a21a88b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "threadpool.h"
#include "arena_list.h"
#include "array.h"

#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define MAX_FUTURES 20
typedef struct Thread {
    pthread_t pthread;
    size_t index;
} Thread;

typedef struct ThreadPool {
    /* TODO: benötigte Attribute hinzufügen */
    size_t size;
    Thread *threads;
    Node arena[MAX_FUTURES];
    AtomicArenaList al;
} ThreadPool;

ThreadPool threadPool;

bool isFutureWaiting(void const *vFuture) {
    Future *future = (Future *)vFuture;
    FutureStatus status = atomic_load(&future->status);
    return status == FUT_WAITING;
}

void tryRunningFuture(Future *future) {
    char expected = FUT_WAITING;
    if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) {
        return;
    }

    future->fn(future);
    expected = FUT_IN_PROGRESS;
    if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_DONE)) {
        perror("The future got marked as done by another thread");
        exit(-1);
    }
}
/* TODO: interne Hilfsfunktionen hinzufügen */

bool tryDoingWork() {
    Future *future = alFindLastElem(&threadPool.al, &isFutureWaiting);
    if (future == NULL) {
        return false;
    }
    // This will just return if some other thread was first as well
    tryRunningFuture(future);
    return true;
}
/** @brief worker function running in the threadpool
 */
_Noreturn void *poolWorkerFunc(void *v_index) {
    size_t index = *(size_t *)v_index;
    printf("Thread %zu started \n", index);
    unsigned int backoffTime = 1;
    while (true) {
        if (!tryDoingWork()) {
            backoffTime = 2 * backoffTime;
            fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n",
                    index, backoffTime);
            sleep(backoffTime);
            continue;
        } else {
            backoffTime = 1;
        }
    }
}

int tpInit(size_t size) {
    threadPool.size = size;
    arrayInit(threadPool.threads);
    threadPool.al = alInit(threadPool.arena, MAX_FUTURES);
    for (size_t i = 0; i < size; ++i) {
        arrayPush(threadPool.threads);
        int status;
        threadPool.threads[i].index = i;
        if ((status = pthread_create(&threadPool.threads[i].pthread, NULL,
                                     poolWorkerFunc, &threadPool.threads[i].index)) <
            0) {
            return status;
        }
    }
    return 0;
}

void tpRelease(void) {
    for (size_t i = 0; i < threadPool.size; ++i) {
        if (pthread_cancel(threadPool.threads[i].pthread) != 0) {
            fprintf(stderr, "The thread %zu had already exited. \n",
                    threadPool.threads[i].index);
        }
    }
    for (size_t i = 0; i < threadPool.size; ++i) {
        int status;
        if ((status = pthread_join(threadPool.threads[i].pthread, NULL)) != 0) {
            perror("An error occured while joining the threads");
            exit(status);
        }
    }
    arrayRelease(threadPool.threads);
}

void tpAsync(Future *future) {
    if (alPush(&threadPool.al, (void *)future) < -1) {
        perror("Push failed");
        exit(-1);
    }
}

void tpAwait(Future *future) {
    if (atomic_load(&future->status) == FUT_DONE) {
        alRemoveElem(&threadPool.al, (void *)future);
        return;
    }
    if (atomic_load(&future->status) == FUT_WAITING) {
        tryRunningFuture(future);
    }

    while (atomic_load(&future->status) != FUT_DONE) {
        tryDoingWork();
    }
    alRemoveElem(&threadPool.al, (void *)future);
}