6 #ifndef TELAMON_HELP_QUEUE_HH
7 #define TELAMON_HELP_QUEUE_HH
19 #include <extern/loguru/loguru.hpp>
21 thread_local std::thread::id current_thread_id = std::this_thread::get_id();
28 template<
typename T, const
int N = 16>
33 enum class Operation : int { enqueue };
38 loguru::add_file(
"helpqueue.log", loguru::Append, loguru::Verbosity_MAX);
41 m_head.store(Node::SENTITEL_NODE.get());
42 m_tail.store(Node::SENTITEL_NODE.get());
44 std::ranges::for_each(m_states, [] (
auto &state) {
45 state.store(OperationDescription::EMPTY.get());
57 <<
"Thread '" << current_thread_id <<
"': push_back with value " << element <<
" type T = [" <<
typeid(T).name()
60 int phase = max_phase().value_or(-1) + 1;
62 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Calculated phase = " << phase <<
'\n';
65 auto *node =
new Node{element, enqueuer};
67 m_states.at(enqueuer).store(description);
70 LOG_S(INFO) <<
"Thread '" << current_thread_id
71 <<
"': State updated with initial OperationDescription for push_back operation.\n";
75 help_finish_enqueue();
83 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Peeking the front of the help queue." <<
'\n';
85 auto next = m_head.load()->next().load();
89 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': The help queue is empty." <<
'\n';
94 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': The help queue's head points to data = " << next->data() <<
'\n';
96 return {next->data()};
106 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Trying to pop the front of the help queue (expecting data = " << expected_head <<
")\n";
108 auto head_ptr = m_head.load();
109 auto next_ptr = head_ptr->next().load();
110 if (!next_ptr || next_ptr->data() != expected_head) {
113 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': The help queue is empty." <<
'\n';
118 if (m_head.compare_exchange_strong(head_ptr, next_ptr)) {
119 help_finish_enqueue();
120 head_ptr->set_next(
nullptr);
122 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': CAS during try_pop_front was successful" <<
'\n';
127 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': CAS during try_pop_front FAILED" <<
'\n';
134 bool is_pending (
int state_id,
int phase_limit) {
135 auto state_ptr = m_states.at(state_id).load();
136 return state_ptr->pending() && state_ptr->phase() <= phase_limit;
144 void help_finish_enqueue () {
146 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"' in helping to finish push_back operation.\n";
148 auto tail_ptr = m_tail.load();
149 auto next_ptr = tail_ptr->next().load();
152 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Tail pointer correctly put.\n";
158 auto id = next_ptr->enqueuer_id();
159 auto old_state_ptr = m_states.at(
id).load();
161 if (tail_ptr != m_tail.load()) {
163 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Tail pointer updated.\n";
168 if (old_state_ptr->node() != next_ptr) {
170 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': The thread which started this operation has already changed the node it is working "
171 "on, thus this operation has already finished.\n";
177 auto updated_state_ptr =
new OperationDescription{
178 old_state_ptr->phase(),
181 old_state_ptr->node()
186 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Performing CAS-es on the state and on the tail.\n";
188 (void) m_states.at(
id).compare_exchange_weak(old_state_ptr, updated_state_ptr);
189 (void) m_tail.compare_exchange_strong(tail_ptr, next_ptr);
199 void help_enqueue (
int state_idx,
int helper_phase) {
201 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Starting to help Thread '" << state_idx <<
"' with by having phase = " << helper_phase
204 while (is_pending(state_idx, helper_phase)) {
206 auto *tail_ptr = m_tail.load();
207 auto &tail = *tail_ptr;
208 auto *next_ptr = tail_ptr->next().load();
210 if (tail_ptr != m_tail.load()) {
212 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Tail pointer modified. Retrying ...\n";
217 if (next_ptr !=
nullptr) {
219 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Tail pointer outdated. Retrying ...\n";
221 help_finish_enqueue();
225 if (!is_pending(state_idx, helper_phase)) {
227 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Operation already executed. Done.\n";
232 auto *state_ptr = m_states.at(state_idx).load();
233 auto state = *state_ptr;
234 if (!state.pending()) {
236 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"': Operation already executed. Done.\n";
241 auto *new_next_ptr = state_ptr->node();
242 if (tail.next().compare_exchange_strong(next_ptr, new_next_ptr)) {
244 LOG_S(INFO) <<
"Thread '" << current_thread_id <<
"' in helping Thread '" << state_idx
245 <<
"': CAS on tail, next and node.\n";
247 return help_finish_enqueue();
252 void help_others (
int helper_phase) {
255 <<
"Thread '" << current_thread_id <<
"': Helping others with helper phase = " << helper_phase <<
'\n';
258 for (
auto &atomic_state : m_states) {
259 auto state = atomic_state.load();
260 if (state->pending() && state->phase() <= helper_phase) {
261 if (state->operation() == Operation::enqueue) {
264 <<
"Thread '" << current_thread_id <<
"': Found operation which needs help - Thread '" << i
265 <<
"' which is performing push_back with phase = " << helper_phase <<
'\n';
267 help_enqueue(i, helper_phase);
274 [[nodiscard]] std::optional<int> max_phase ()
const {
275 auto it = std::max_element(m_states.begin(), m_states.end(), [] (
const auto &state1,
const auto &state2) {
276 return state1.load()->phase() < state2.load()->phase();
278 if (it != m_states.end()) {
279 return it->load()->phase();
285 std::atomic<Node *> m_head;
286 std::atomic<Node *> m_tail;
287 std::array<std::atomic<OperationDescription *>, N> m_states;
293 template<
typename T, const
int N>
297 Node () : m_is_sentitel{true} {}
300 template<
typename ...Args>
301 explicit Node (
int enqueuer, Args &&... args) : m_enqueuer_id{enqueuer}, m_data{std::forward<Args>(args)...} {}
305 : m_data{data}, m_enqueuer_id{enqueuer}, m_next(nullptr) {}
307 bool operator== (
const Node &rhs)
const {
308 return std::tie(m_is_sentitel, m_data, m_next, m_enqueuer_id) ==
309 std::tie(rhs.m_is_sentitel, rhs.m_data, rhs.m_next,
312 bool operator!= (
const Node &rhs)
const {
return !(rhs == *
this); }
315 [[nodiscard]]
bool is_sentitel ()
const {
return m_is_sentitel; }
317 [[nodiscard]]
bool has_data ()
const {
return m_data.has_value(); }
319 [[nodiscard]]
const T &data ()
const {
return m_data.value(); }
321 [[nodiscard]] std::atomic<Node *> &next () {
return m_next; }
323 void set_next (Node *ptr) { m_next.store(ptr); }
325 [[nodiscard]]
int enqueuer_id ()
const {
return m_enqueuer_id; }
327 const inline static auto SENTITEL_NODE = std::make_unique<Node>();
330 const bool m_is_sentitel =
false;
331 std::optional<T> m_data{};
332 std::atomic<Node *> m_next;
333 const int m_enqueuer_id{-1};
337 template<
typename T, const
int N>
349 m_operation{operation},
353 [[nodiscard]]
bool is_empty ()
const {
return m_is_empty; }
354 [[nodiscard]]
bool pending ()
const {
return m_pending; }
355 [[nodiscard]] Operation operation ()
const {
return m_operation; }
356 [[nodiscard]] Node *node () {
return m_node; }
357 [[nodiscard]]
int phase ()
const {
return m_phase; }
359 const inline static auto EMPTY = std::make_unique<OperationDescription>();
362 bool m_is_empty =
false;
364 Operation m_operation{};
This is the main class representing the help queue.
Definition: HelpQueue.hh:29
bool try_pop_front(T expected_head)
Dequeues iff the given value is the same as the value at the head of the queue.
Definition: HelpQueue.hh:104
std::optional< T > peek_front() const
Peek the head of the queue.
Definition: HelpQueue.hh:81
void push_back(const int enqueuer, T element)
Enqueue an element to the tail of the queue.
Definition: HelpQueue.hh:54
This module contains the implementation of a wait-free queue used as an underlying structure in the s...
Definition: HelpQueue.hh:25
The class which represents a node element of the queue.
Definition: HelpQueue.hh:294
Node(int enqueuer, Args &&... args)
Construction of a value in node.
Definition: HelpQueue.hh:301
Node()
Default construction of sentitel node.
Definition: HelpQueue.hh:297
Node(T data, int enqueuer)
Construction of node with copyable data.
Definition: HelpQueue.hh:304
Operation description for the queue used when the queue itself needs "helping".
Definition: HelpQueue.hh:338
constexpr OperationDescription(int phase, bool pending, Operation operation, Node *node)
Default construction.
Definition: HelpQueue.hh:346
constexpr OperationDescription()
Empty construction.
Definition: HelpQueue.hh:342