Telamon
HelpQueue.hh
1 /*
2  * \file HelpQueue.hh
3  * \brief Provides an implementation of a wait-free queue with support for
4  * limited operations
5  */
6 #ifndef TELAMON_HELP_QUEUE_HH
7 #define TELAMON_HELP_QUEUE_HH
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <memory>
12 #include <numeric>
13 #include <thread>
14 #include <typeinfo>
15 #include <optional>
16 #include <ranges>
17 
18 #ifdef TEL_LOGGING
19 #include <extern/loguru/loguru.hpp>
20 
21 thread_local std::thread::id current_thread_id = std::this_thread::get_id();
22 #endif
23 
25 namespace helpqueue {
26 
28 template<typename T, const int N = 16>
29 class HelpQueue {
30  public:
31  struct Node;
32  struct OperationDescription;
33  enum class Operation : int { enqueue };
34 
35  public:
36  constexpr HelpQueue () {
37 #ifdef TEL_LOGGING
38  loguru::add_file("helpqueue.log", loguru::Append, loguru::Verbosity_MAX);
39 #endif
40 
41  m_head.store(Node::SENTITEL_NODE.get());
42  m_tail.store(Node::SENTITEL_NODE.get());
43 
44  std::ranges::for_each(m_states, [] (auto &state) {
45  state.store(OperationDescription::EMPTY.get());
46  });
47  }
48 
49  public:
54  void push_back (const int enqueuer, T element) {
55 #ifdef TEL_LOGGING
56  LOG_S(INFO)
57  << "Thread '" << current_thread_id << "': push_back with value " << element << " type T = [" << typeid(T).name()
58  << "]\n";
59 #endif
60  int phase = max_phase().value_or(-1) + 1;
61 #ifdef TEL_LOGGING
62  LOG_S(INFO) << "Thread '" << current_thread_id << "': Calculated phase = " << phase << '\n';
63 #endif
64  // TODO: Change `new` when hazard pointers are used
65  auto *node = new Node{element, enqueuer};
66  auto *description = new OperationDescription{phase, true, Operation::enqueue, node};
67  m_states.at(enqueuer).store(description);
68 
69 #ifdef TEL_LOGGING
70  LOG_S(INFO) << "Thread '" << current_thread_id
71  << "': State updated with initial OperationDescription for push_back operation.\n";
72 #endif
73 
74  help_others(phase);
75  help_finish_enqueue();
76  }
77 
81  std::optional<T> peek_front () const {
82 #ifdef TEL_LOGGING
83  LOG_S(INFO) << "Thread '" << current_thread_id << "': Peeking the front of the help queue." << '\n';
84 #endif
85  auto next = m_head.load()->next().load();
86 
87  if (!next) {
88 #ifdef TEL_LOGGING
89  LOG_S(INFO) << "Thread '" << current_thread_id << "': The help queue is empty." << '\n';
90 #endif
91  return std::nullopt;
92  }
93 #ifdef TEL_LOGGING
94  LOG_S(INFO) << "Thread '" << current_thread_id << "': The help queue's head points to data = " << next->data() << '\n';
95 #endif
96  return {next->data()};
97  }
98 
104  bool try_pop_front (T expected_head) {
105 #ifdef TEL_LOGGING
106  LOG_S(INFO) << "Thread '" << current_thread_id << "': Trying to pop the front of the help queue (expecting data = " << expected_head << ")\n";
107 #endif
108  auto head_ptr = m_head.load();
109  auto next_ptr = head_ptr->next().load();
110  if (!next_ptr || next_ptr->data() != expected_head) {
111 
112 #ifdef TEL_LOGGING
113  LOG_S(INFO) << "Thread '" << current_thread_id << "': The help queue is empty." << '\n';
114 #endif
115  return false;
116  }
117 
118  if (m_head.compare_exchange_strong(head_ptr, next_ptr)) {
119  help_finish_enqueue();
120  head_ptr->set_next(nullptr);
121 #ifdef TEL_LOGGING
122  LOG_S(INFO) << "Thread '" << current_thread_id << "': CAS during try_pop_front was successful" << '\n';
123 #endif
124  return true;
125  }
126 #ifdef TEL_LOGGING
127  LOG_S(INFO) << "Thread '" << current_thread_id << "': CAS during try_pop_front FAILED" << '\n';
128 #endif
129  return false;
130  }
131 
132  private: //< Helper functions
133 
134  bool is_pending (int state_id, int phase_limit) {
135  auto state_ptr = m_states.at(state_id).load();
136  return state_ptr->pending() && state_ptr->phase() <= phase_limit;
137  }
138 
144  void help_finish_enqueue () {
145 #ifdef TEL_LOGGING
146  LOG_S(INFO) << "Thread '" << current_thread_id << "' in helping to finish push_back operation.\n";
147 #endif
148  auto tail_ptr = m_tail.load();
149  auto next_ptr = tail_ptr->next().load();
150  if (!next_ptr) {
151 #ifdef TEL_LOGGING
152  LOG_S(INFO) << "Thread '" << current_thread_id << "': Tail pointer correctly put.\n";
153 #endif
154  return;
155  }
156 
157  // Id's value is valid since next cannot be Node::SENTINEL
158  auto id = next_ptr->enqueuer_id();
159  auto /* std::atomic<OperationDescription*> */ old_state_ptr = m_states.at(id).load();
160 
161  if (tail_ptr != m_tail.load()) {
162 #ifdef TEL_LOGGING
163  LOG_S(INFO) << "Thread '" << current_thread_id << "': Tail pointer updated.\n";
164 #endif
165  return;
166  }
167 
168  if (old_state_ptr->node() != next_ptr) {
169 #ifdef TEL_LOGGING
170  LOG_S(INFO) << "Thread '" << current_thread_id << "': The thread which started this operation has already changed the node it is working "
171  "on, thus this operation has already finished.\n";
172 #endif
173  return;
174  }
175 
176  // TODO: Change `new` when proper memory reclamation scheme is added (hazard pointers).
177  auto updated_state_ptr = new OperationDescription{
178  old_state_ptr->phase(),
179  false,
180  Operation::enqueue,
181  old_state_ptr->node()
182  };
183 
184  // Update
185 #ifdef TEL_LOGGING
186  LOG_S(INFO) << "Thread '" << current_thread_id << "': Performing CAS-es on the state and on the tail.\n";
187 #endif
188  (void) m_states.at(id).compare_exchange_weak(old_state_ptr, updated_state_ptr);
189  (void) m_tail.compare_exchange_strong(tail_ptr, next_ptr);
190  }
191 
199  void help_enqueue (int state_idx, int helper_phase) {
200 #ifdef TEL_LOGGING
201  LOG_S(INFO) << "Thread '" << current_thread_id << "': Starting to help Thread '" << state_idx << "' with by having phase = " << helper_phase
202  << '\n';
203 #endif
204  while (is_pending(state_idx, helper_phase)) {
205 
206  auto *tail_ptr = m_tail.load();
207  auto &tail = *tail_ptr;
208  auto *next_ptr = tail_ptr->next().load();
209 
210  if (tail_ptr != m_tail.load()) {
211 #ifdef TEL_LOGGING
212  LOG_S(INFO) << "Thread '" << current_thread_id << "': Tail pointer modified. Retrying ...\n";
213 #endif
214  continue;
215  }
216 
217  if (next_ptr != nullptr) {
218 #ifdef TEL_LOGGING
219  LOG_S(INFO) << "Thread '" << current_thread_id << "': Tail pointer outdated. Retrying ...\n";
220 #endif
221  help_finish_enqueue();
222  continue;
223  }
224 
225  if (!is_pending(state_idx, helper_phase)) {
226 #ifdef TEL_LOGGING
227  LOG_S(INFO) << "Thread '" << current_thread_id << "': Operation already executed. Done.\n";
228 #endif
229  return;
230  }
231 
232  auto *state_ptr = m_states.at(state_idx).load();
233  auto state = *state_ptr;
234  if (!state.pending()) {
235 #ifdef TEL_LOGGING
236  LOG_S(INFO) << "Thread '" << current_thread_id << "': Operation already executed. Done.\n";
237 #endif
238  return;
239  }
240 
241  auto *new_next_ptr = state_ptr->node();
242  if (tail.next().compare_exchange_strong(next_ptr, new_next_ptr)) {
243 #ifdef TEL_LOGGING
244  LOG_S(INFO) << "Thread '" << current_thread_id << "' in helping Thread '" << state_idx
245  << "': CAS on tail, next and node.\n";
246 #endif
247  return help_finish_enqueue();
248  }
249  }
250  }
251 
252  void help_others (int helper_phase) {
253 #ifdef TEL_LOGGING
254  LOG_S(INFO)
255  << "Thread '" << current_thread_id << "': Helping others with helper phase = " << helper_phase << '\n';
256 #endif
257  int i = 0;
258  for (auto &atomic_state : m_states) {
259  auto state = atomic_state.load();
260  if (state->pending() && state->phase() <= helper_phase) {
261  if (state->operation() == Operation::enqueue) {
262 #ifdef TEL_LOGGING
263  LOG_S(INFO)
264  << "Thread '" << current_thread_id << "': Found operation which needs help - Thread '" << i
265  << "' which is performing push_back with phase = " << helper_phase << '\n';
266 #endif
267  help_enqueue(i, helper_phase);
268  }
269  }
270  ++i;
271  }
272  }
273 
274  [[nodiscard]] std::optional<int> max_phase () const {
275  auto it = std::max_element(m_states.begin(), m_states.end(), [] (const auto &state1, const auto &state2) {
276  return state1.load()->phase() < state2.load()->phase();
277  });
278  if (it != m_states.end()) {
279  return it->load()->phase();
280  }
281  return {};
282  }
283 
284  private:
285  std::atomic<Node *> m_head;
286  std::atomic<Node *> m_tail;
287  std::array<std::atomic<OperationDescription *>, N> m_states;
288 };
289 
293 template<typename T, const int N>
294 struct HelpQueue<T, N>::Node {
295  public:
297  Node () : m_is_sentitel{true} {}
298 
300  template<typename ...Args>
301  explicit Node (int enqueuer, Args &&... args) : m_enqueuer_id{enqueuer}, m_data{std::forward<Args>(args)...} {}
302 
304  Node (T data, int enqueuer)
305  : m_data{data}, m_enqueuer_id{enqueuer}, m_next(nullptr) {}
306 
307  bool operator== (const Node &rhs) const {
308  return std::tie(m_is_sentitel, m_data, m_next, m_enqueuer_id) ==
309  std::tie(rhs.m_is_sentitel, rhs.m_data, rhs.m_next,
310  rhs.m_enqueuer_id);
311  }
312  bool operator!= (const Node &rhs) const { return !(rhs == *this); }
313 
314  public:
315  [[nodiscard]] bool is_sentitel () const { return m_is_sentitel; }
316 
317  [[nodiscard]] bool has_data () const { return m_data.has_value(); }
318 
319  [[nodiscard]] const T &data () const { return m_data.value(); }
320 
321  [[nodiscard]] std::atomic<Node *> &next () { return m_next; }
322 
323  void set_next (Node *ptr) { m_next.store(ptr); }
324 
325  [[nodiscard]] int enqueuer_id () const { return m_enqueuer_id; }
326 
327  const inline static auto SENTITEL_NODE = std::make_unique<Node>();
328 
329  private:
330  const bool m_is_sentitel = false;
331  std::optional<T> m_data{};
332  std::atomic<Node *> m_next;
333  const int m_enqueuer_id{-1};
334 };
335 
337 template<typename T, const int N>
339  public:
342  constexpr OperationDescription () : m_is_empty{true} {}
343 
346  constexpr OperationDescription (int phase, bool pending, Operation operation, Node *node)
347  : m_phase{phase},
348  m_pending{pending},
349  m_operation{operation},
350  m_node{node} {}
351 
352  public:
353  [[nodiscard]] bool is_empty () const { return m_is_empty; }
354  [[nodiscard]] bool pending () const { return m_pending; }
355  [[nodiscard]] Operation operation () const { return m_operation; }
356  [[nodiscard]] Node *node () { return m_node; }
357  [[nodiscard]] int phase () const { return m_phase; }
358 
359  const inline static auto EMPTY = std::make_unique<OperationDescription>();
360 
361  private:
362  bool m_is_empty = false;
363  bool m_pending{};
364  Operation m_operation{};
365  Node *m_node;
366  int m_phase;
367 };
368 
369 } // namespace helpqueue
370 
371 #endif // TELAMON_HELP_QUEUE_HH
This is the main class representing the help queue.
Definition: HelpQueue.hh:29
bool try_pop_front(T expected_head)
Dequeues iff the given value is the same as the value at the head of the queue.
Definition: HelpQueue.hh:104
std::optional< T > peek_front() const
Peek the head of the queue.
Definition: HelpQueue.hh:81
void push_back(const int enqueuer, T element)
Enqueue an element to the tail of the queue.
Definition: HelpQueue.hh:54
This module contains the implementation of a wait-free queue used as an underlying structure in the s...
Definition: HelpQueue.hh:25
The class which represents a node element of the queue.
Definition: HelpQueue.hh:294
Node(int enqueuer, Args &&... args)
Construction of a value in node.
Definition: HelpQueue.hh:301
Node()
Default construction of sentitel node.
Definition: HelpQueue.hh:297
Node(T data, int enqueuer)
Construction of node with copyable data.
Definition: HelpQueue.hh:304
Operation description for the queue used when the queue itself needs "helping".
Definition: HelpQueue.hh:338
constexpr OperationDescription(int phase, bool pending, Operation operation, Node *node)
Default construction.
Definition: HelpQueue.hh:346
constexpr OperationDescription()
Empty construction.
Definition: HelpQueue.hh:342