summaryrefslogtreecommitdiffstats
path: root/04_exercise/threadpool.c
blob: f046235e436b077edb7e58c3adf37b7fed8bc95c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "threadpool.h"
#include "array.h"

#include "slotmap.h"
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define MAX_FUTURES 1024
typedef struct Thread {
    pthread_t pthread;
    size_t index;
} Thread;

typedef struct ThreadPool {
    /* TODO: benötigte Attribute hinzufügen */
    size_t size;
    Thread *threads;
    smEntry slab [MAX_FUTURES];
    smHeader header;
} ThreadPool;

ThreadPool threadPool;

bool isFutureWaiting(void const *vFuture) {
    Future *future = (Future *)vFuture;
    FutureStatus status = atomic_load(&future->status);
    //This is racy but we'll CAS it later to make sure it's safe
    return status == FUT_WAITING;
}
//TODO Document the behaviour of this function
bool tryRunningFuture(Future *  const future) {
    char expected = FUT_WAITING;
    // This can happen if multiple threads found the same Future
    if (!atomic_compare_exchange_strong(&future->status, &expected, FUT_IN_PROGRESS)) {
        return false;
    }
    smDeleteValue(& threadPool.header, future);
    future->fn(future);

    if (atomic_exchange(&future->status, FUT_DONE) != FUT_IN_PROGRESS) {
        perror("The future got marked as done by another thread");
        exit(-1);
    }
    return true;
}
/* TODO: interne Hilfsfunktionen hinzufügen */

bool tryDoingWork() {
    smEntry *entry = smFindEntry(&threadPool.header, &isFutureWaiting);
    if (entry == NULL) {
        return false;
    }
    Future * fut = (Future *)atomic_load(&entry->value);
    // This will just return if some other thread was first
    if(fut == NULL) {
        return false;
    }
    return tryRunningFuture(fut);
}
/** @brief worker function running in the threadpool
 */
_Noreturn void *poolWorkerFunc(void *v_index) {
    size_t index = *(size_t *)v_index;
    printf("Thread %zu started \n", index);
    unsigned int backoffTime = 1;
    while (true) {
        if (!tryDoingWork()) {
            // backoffTime = 2 * backoffTime;
            fprintf(stderr, "Thread %zu: Found no work sleeping for %d seconds \n",
                    index, backoffTime);
            sleep(backoffTime);
            continue;
        } else {
            backoffTime = 1;
        }
    }
}

int tpInit(size_t size) {
    threadPool.size = size;
    threadPool.threads = malloc(sizeof(Thread)*size);
    threadPool.header = smInit((smEntry *)&threadPool.slab, MAX_FUTURES);
    for (size_t i = 0; i < size; ++i) {
        int status;
        threadPool.threads[i].index = i;
        if ((status = pthread_create(&threadPool.threads[i].pthread, NULL,
                                     poolWorkerFunc, &threadPool.threads[i].index)) <
            0) {
            return status;
        }
    }
    return 0;
}

void tpRelease(void) {
    for (size_t i = 0; i < threadPool.size; ++i) {
        if (pthread_cancel(threadPool.threads[i].pthread) != 0) {
            fprintf(stderr, "The thread %zu had already exited. \n",
                    threadPool.threads[i].index);
        }
    }
    for (size_t i = 0; i < threadPool.size; ++i) {
        int status;
        if ((status = pthread_join(threadPool.threads[i].pthread, NULL)) != 0) {
            perror("An error occured while joining the threads");
            exit(status);
        }
    }
    free(threadPool.threads);
}

void tpAsync(Future *future) {
    while (smInsert(&threadPool.header, (void *)future) < 0) {
        tryDoingWork();
    }
    sched_yield();
}

void tpAwait(Future *future) {
    if (atomic_load(&future->status) == FUT_WAITING) {
        tryRunningFuture(future);
    }

    while (atomic_load(&future->status) != FUT_DONE) {
        tryDoingWork();
    }
}