-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstatic_thread_pool.cpp
More file actions
150 lines (102 loc) · 3.88 KB
/
static_thread_pool.cpp
File metadata and controls
150 lines (102 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//
// Created by Vova on 17.10.2020.
//
#include "static_thread_pool.h"
#include "displaying/print_stl.h"
std::mutex print_mutex;
void safe_print(const std::string& to_print) {
std::lock_guard<std::mutex> lck (print_mutex);
std::cout << to_print << std::endl;
}
// std::atomic<size_t> threads_came_through_variable = 0;
////////////////////////////////////////////////////// Thread wrapper ////////////////////////////////////////////////
void thread_wrapper(
size_t thread_index,
const std::function<void(size_t)>& function_to_launch_at_each_iteration,
static_thread_pool& parent_thread_pool
) {
lint this_iteration = 0;
while (true) {
/// Passively Wait to start (don`t consume CPU cycles for checking):
std::unique_lock<std::mutex> lck(parent_thread_pool.running_cv_protector);
parent_thread_pool.running_cv.wait(lck, [&parent_thread_pool](){ return parent_thread_pool.thread_ending_state_ready; });
bool threads_should_be_run_or_not = parent_thread_pool.threads_should_be_run;
lck.unlock();
if (!threads_should_be_run_or_not) break;
// std::this_thread::sleep_for(0.1s);
// safe_print("thread " + std::to_string(thread_index) + " gone through the condition variable, " + "threads_should_be_run_or_not is " + std::to_string(threads_should_be_run_or_not));
/// Do work:
function_to_launch_at_each_iteration(thread_index);
parent_thread_pool.this_it_ready_thread_number++;
/// Say that I`m ready:
parent_thread_pool.threads_ready[thread_index] = true;
while (parent_thread_pool.max_all_threads_ready_run < this_iteration) {
/// Wait all the other threads
}
this_iteration++;
}
}
// void thread_wrapper(size_t thread_index, const std::function<void(size_t)>& function_to_launch_at_each_iteration);
/**
* Public methods:
*/
void static_thread_pool::init (size_t thread_number,
const std::function<void (size_t)> &function_to_launch_at_each_iteration)
{
if (is_initialized) throw std::runtime_error("Pool is already initialized!");
is_initialized = true;
m_thread_number = thread_number;
threads_ready.assign(thread_number, false);
for (size_t thread_index = 0; thread_index < thread_number; ++thread_index) {
m_threads.emplace_back(thread_wrapper,
thread_index,
function_to_launch_at_each_iteration,
std::ref(*this)
);
}
}
static_thread_pool::static_thread_pool (size_t thread_number, const std::function<void(size_t)>& function_to_launch_at_each_iteration)
{
init(thread_number, function_to_launch_at_each_iteration);
}
void static_thread_pool::compute ()
{
inform_thread_next_iteration();
wait_for_threads_ready();
}
void static_thread_pool::join ()
{
inform_thread_ending();
for(auto& thread : m_threads)
thread.join();
}
////////////////////////////////////////////////////// Thread pool helpers ////////////////////////////////////////////////
void static_thread_pool::inform_thread_ending_state (bool continue_or_not)
{
std::lock_guard<std::mutex> locker(running_cv_protector);
threads_should_be_run = continue_or_not;
thread_ending_state_ready = true;
running_cv.notify_all();
}
void static_thread_pool::wait_for_threads_ready ()
{
/// ACTIVE Waiting for all threads to complete:
// safe_print("[main thread]: start waiting");
while(
// std::any_of(threads_ready.begin(), threads_ready.end(), [](bool val){ return !val; })
this_it_ready_thread_number < m_thread_number
)
{
// Wait
}
// safe_print("[main thread]: ended waiting");
/// All threads ready => reset everything to the initial state:
std::fill(threads_ready.begin(), threads_ready.end(), false); // Clear thread ready
this_it_ready_thread_number = 0;
thread_ending_state_ready = false;
// threads_should_be_run = false;
// safe_print("[main thread]: informed");
/// Release other threads
max_all_threads_ready_run++;
// safe_print("[main thread]: released");
}