Telamon
WaitFreeSimulator.hh
1 #ifndef TELAMON_WAIT_FREE_SIMULATOR_HH_
2 #define TELAMON_WAIT_FREE_SIMULATOR_HH_
3 
4 #include <atomic>
5 #include <memory>
6 #include <concepts>
7 #include <variant>
8 #include <vector>
9 #include <thread>
10 #include <algorithm>
11 #include <mutex>
12 #include <numeric>
13 #include <utility>
14 #include <type_traits>
15 #include <optional>
16 
17 #include <extern/expected_lite/expected.hpp>
18 
19 #ifdef TEL_LOGGING
20 #define LOGURU_WITH_STREAMS 1
21 #include <extern/loguru/loguru.hpp>
22 //loguru::g_stderr_verbosity = loguru::Verbosity_INFO;
23 //loguru::add_file("wfsimulator.log", loguru::Append, loguru::Verbosity_MAX);
24 #endif
25 
26 #include "HelpQueue.hh"
27 #include "OperationHelping.hh"
28 
30 template<class... T>
31 struct OverloadedVisitor : T ... { using T::operator()...; };
32 
33 namespace telamon_simulator {
34 
36 namespace telamon_private {
37 
39 template<NormalizedRepresentation LockFree, const int N = 16>
41  using Id = int;
42  using Input = typename LockFree::Input;
43  using Output = typename LockFree::Output;
44  using Commit = typename LockFree::Commit;
45 
48  using OpState = typename OperationRecord<LockFree>::OperationState;
49 
50  template<typename T, typename Err = std::monostate>
51  using OptionalResultOrError = nonstd::expected<std::optional<T>, Err>;
52 
53  public:
54  explicit WaitFreeSimulator (const LockFree &lf) : m_algorithm{lf}, m_helpqueue{} {}
55 
56  explicit WaitFreeSimulator (LockFree &&lf) : m_algorithm{std::move(lf)}, m_helpqueue{} {}
57 
58  public:
59 
66  auto run (const Id id, const Input &input, bool use_slow_path = false) -> Output {
67  auto contention_counter = ContentionFailureCounter{};
68 #ifdef TEL_LOGGING
69  LOG_S(INFO) << "Running the simulation with id = '" << id << "' and input = '" << input << "'";
70  LOG_IF_F(INFO, use_slow_path, "Setting a preference to use the slow path");
71 #endif
72  try_help_others(id);
73 
74  if (!use_slow_path) {
75  for (int i = 0; i < ContentionFailureCounter::FAST_PATH_RETRY_THRESHOLD; ++i) {
76 #ifdef TEL_LOGGING
77  LOG_S(INFO) << "Retry #" << i << "using the fast-path with input = " << input;
78 #endif
79  auto fp_result = fast_path(input, contention_counter);
80  if (fp_result.has_value()) {
81 #ifdef TEL_LOGGING
82  LOG_F(INFO, "Fast-path succeeded. Returning output");
83 #endif
84  return fp_result.value();
85  }
86  if (contention_counter.detect()) {
87 #ifdef TEL_LOGGING
88  LOG_F(INFO, "Contention detected. Using slow-path.");
89 #endif
90  break;
91  }
92  }
93  }
94 
95  return slow_path(id, input);
96  }
97 
99  auto try_help_others (const Id id) -> void {
100  auto front = m_helpqueue.peek_front();
101  if (front.has_value()) {
102 #ifdef TEL_LOGGING
103  LOG_F(INFO, "Operation requires help in the helpq. Tryting to help it.");
104 #endif
105  help(*front.value());
106  }
107  }
108 
109  private:
111  auto help_precas (OpBox &op_box, const OpRecord &op, const typename OpRecord::PreCas &state) -> OptionalResultOrError<OpRecord *> {
112  auto failures = ContentionFailureCounter{};
113 
114  // Generate CAS-list
115  auto desc = m_algorithm.generator(op.input(), failures);
116  if (!desc.has_value()) {
117  return nonstd::make_unexpected(std::monostate{});
118  }
119 
120  auto updated_state = OpState{typename OpRecord::ExecutingCas(desc.value())};
121  return new OpRecord{op, updated_state};
122  }
123 
125  auto help_postcas (OpBox &op_box, const OpRecord &op, const typename OpRecord::PostCas &state) -> OptionalResultOrError<OpRecord *> {
126  auto failures = ContentionFailureCounter{};
127 
128  auto result_opt = m_algorithm.wrap_up(state.executed, state.cas_list, failures);
129  if (!result_opt.has_value()) {
130  // Contention encountered. Try again.
131  return nonstd::make_unexpected(std::monostate{});
132  }
133 
134  if (auto result = result_opt.value(); result.has_value()) {
135  // Operation has been successfully executed.
136  auto updated_state = OpState{typename OpRecord::Completed(result.value())};
137  return new OpRecord{op, updated_state};
138  }
139 
140  // Operation failed and has to be restarted.
141  auto updated_state = OpState{typename OpRecord::PreCas{}};
142  return new OpRecord{op, updated_state};
143  }
144 
146  auto help_executingcas (OpBox &op_box, const OpRecord &op, typename OpRecord::ExecutingCas &state) -> OptionalResultOrError<OpRecord *, int> {
147  auto failures = ContentionFailureCounter{};
148 
149  auto result = commit(state.cas_list, failures);
150  if (!result.has_value()) {
151  if (auto err = result.error(); err.has_value()) {
152  // Contention encounter. Try again.
153  return std::optional<OpRecord *>{};
154  } else {
155  // Failed at a specific CAS
156  return nonstd::make_unexpected(err.value());
157  }
158  }
159 
160  auto updated_op = new OpRecord{op, typename OpRecord::PostCas(state.cas_list, result)};
161  return updated_op;
162  }
163 
168  auto help (OperationRecordBox<LockFree> &op_box) -> void {
169  using HelperVisitResult = std::pair<bool, OptionalResultOrError<OpRecord *>>;
170  while (true) {
171  auto op_ptr = op_box.ptr();
172  const auto &op = *op_ptr;
173 
174  auto[continue_, updated_op] = std::visit(OverloadedVisitor{
175  [&] (const typename OpRecord::PreCas &arg) -> HelperVisitResult {
176 #ifdef TEL_LOGGING
177  LOG_F(INFO, "Performing help of an operation in the PreCas state.");
178 #endif
179  auto result = help_precas(op_box, op, arg);
180  bool continue_ = !result.has_value(); //< If there is contention, try again (continue the outer loop)
181  return std::make_pair(continue_, result);
182  },
183  [&] (const typename OpRecord::ExecutingCas &arg) -> HelperVisitResult {
184 #ifdef TEL_LOGGING
185  LOG_F(INFO, "Performing help of an operation in the ExecutingCas state.");
186 #endif
187  auto &mut_arg = *const_cast<typename OpRecord::ExecutingCas *>(&arg);
188  auto result_ = help_executingcas(op_box, op, mut_arg);
189  // continue_ is set iff the execution failed and _none_ of the CAS-es was successfully performed
190  bool continue_ = result_.has_value() && !result_.value().has_value();
191  // help_executingcas has a different return type and has to be "reformatted"
192 
193  // If continue_ is set then the value of result wil never be read
194  if (continue_) { return std::make_pair(continue_, nullptr); }
195 
196  // value().value() is fine because we would have already returned if there wasn't a value in the optional<>
197  if (result_.has_value()) { return std::make_pair(continue_, result_.value().value()); }
198 
199  auto unit = nonstd::make_unexpected(std::monostate{});
200  return std::make_pair(continue_, unit);
201  },
202  [&] (const typename OpRecord::PostCas &arg) -> HelperVisitResult {
203 #ifdef TEL_LOGGING
204  LOG_F(INFO, "Performing help of an operation in the PostCas state.");
205 #endif
206  auto result = help_postcas(op_box, op, arg);
207  bool continue_ = !result.has_value(); //< If there is contention, try again (continue the outer loop)
208  return std::make_pair(continue_, result);
209  },
210  [&] (const typename OpRecord::Completed &arg) -> HelperVisitResult {
211 #ifdef TEL_LOGGING
212  LOG_F(INFO, "Performing help of an operation in the Completed state.");
213 #endif
214  auto _ = m_helpqueue.try_pop_front(&op_box);
215  auto updated_state = op_box.state();
216  return std::make_pair(false, std::make_optional(new OpRecord{op, updated_state}));
217  }
218  }, op.state());
219 
220  if (continue_) { continue; }
221  if (!updated_op) { break; }
222 
223  // Safety for calling value().value(): continue_ would be true and thus we wouldn't have reached this line
224  OpRecord *updated_op_ptr = updated_op.value().value();
225  if (!op_box.atomic_ptr().compare_exchange_strong(op_ptr, updated_op_ptr)) {
226  // Unsuccessful, therefore we can safely deallocate the OpRecord we created (It never got shared with other threads).
227 #ifdef TEL_LOGGING
228  LOG_F(WARNING, "CAS during help of an operation failed.");
229 #endif
230  delete updated_op_ptr;
231  }
232 
233  if (std::holds_alternative<typename OpRecord::Completed>(op_box.state())) {
234 #ifdef TEL_LOGGING
235  LOG_F(INFO, "Operation which required help now finished. Returning from help.");
236 #endif
237  break;
238  } //< Completed
239  }
240  }
241 
247  auto commit (Commit &cas_list, ContentionFailureCounter &failures) -> nonstd::expected<std::monostate, std::optional<int>> {
248  for (int i = 0; auto &cas : cas_list) {
249  switch (auto state = cas.state()) {
250  case CasStatus::Failure:
251 #ifdef TEL_LOGGING
252  LOG_F(WARNING, "During commit: CAS #%d failed.", i);
253 #endif
254  return nonstd::make_unexpected(i);
255  break;
256  case CasStatus::Success:
257 #ifdef TEL_LOGGING
258  LOG_F(INFO, "During commit: CAS was successfully executed. Clearing modified_bit and returning.");
259 #endif
260  cas.clear_bit();
261  break;
262  case CasStatus::Pending: {
263  if (auto result = cas.execute(failures); !result) {
264 #ifdef TEL_LOGGING
265  LOG_F(INFO, "During commit: CAS #%d failed. Returning...", i);
266 #endif
267  return nonstd::make_unexpected(std::nullopt);
268  }
269  if (cas.has_modified_bit()) {
270  (void) cas.swap_state(CasStatus::Pending, CasStatus::Success);
271  cas.clear_bit();
272 #ifdef TEL_LOGGING
273  LOG_F(INFO, "During commit: CAS #%d succeeded. Getting to the next one.", i);
274 #endif
275  }
276  if (cas.state() != CasStatus::Success) {
277  cas.set_state(CasStatus::Failure);
278 #ifdef TEL_LOGGING
279  LOG_F(WARNING, "During commit: CAS #%d failed. Returning...", i);
280 #endif
281  return nonstd::make_unexpected(i);
282  }
283  break;
284  }
285  }
286  ++i;
287  }
288 
289 #ifdef TEL_LOGGING
290  LOG_F(INFO, "During commit: All CAS-es succeeded. Returning...");
291 #endif
292  return std::monostate{};
293  }
294 
298  auto slow_path (const Id id, const Input &input) -> Output {
299  // Enqueue description of the operation
300  auto *op_box = new OperationRecordBox<LockFree>{id, typename OpRecord::PreCas{}, input};
301  m_helpqueue.push_back(id, op_box);
302 #ifdef TEL_LOGGING
303  LOG_S(INFO) << "During slowpath: Enqueueing a new operation record box in Precas state with input = " << input << " and id = " << id;
304 #endif
305 
306  // Help until operation is complete
307  using StateCompleted = typename OperationRecord<LockFree>::Completed;
308  while (true) {
309  auto updated_state = op_box->state();
310 #ifdef TEL_LOGGING
311  LOG_F(INFO, "During slow path: Checking the state the enqueued operation");
312 #endif
313  if (std::holds_alternative<StateCompleted>(updated_state)) {
314  auto sp_result = std::get<StateCompleted>(updated_state);
315 #ifdef TEL_LOGGING
316  LOG_S(INFO) << "Operation succeeded with output = " << sp_result.output;
317 #endif
318  return sp_result.output;
319  }
320 #ifdef TEL_LOGGING
321  LOG_F(INFO, "During slow path: Operation still not finished. Trying to help again.");
322 #endif
323  try_help_others(id);
324  }
325  }
326 
328  auto fast_path (const Input &input, ContentionFailureCounter &contention_counter) -> std::optional<Output> {
329 #ifdef TEL_LOGGING
330  LOG_F(INFO, "Invoking the fast path of the algorithm");
331 #endif
332  return m_algorithm.fast_path(input, contention_counter);
333  }
334 
335  private:
336  LockFree m_algorithm;
338 };
339 
340 }
341 
343 template<NormalizedRepresentation LockFree, const int N = 16>
345  public:
346  using Id = int;
347  using Input = typename LockFree::Input;
348  using Output = typename LockFree::Output;
349  using Commit = typename LockFree::Commit;
350 
353  using OpState = typename OperationRecord<LockFree>::OperationState;
354 
355  template<typename T, typename Err = std::monostate>
356  using OptionalResultOrError = nonstd::expected<std::optional<T>, Err>;
357 
359 
360  public:
362  struct MetaData {
363  std::vector<Id> m_free;
364  std::mutex m_free_lock;
365  };
366 
367  public: //< Construction API
368  explicit WaitFreeSimulatorHandle (LockFree algorithm)
369  : m_id{0}, m_simulator{std::make_shared<Simulator>(algorithm)}, m_meta{std::make_shared<MetaData>()} {
370  // Safe to access m_meta without atomic load because it has never been shared
371  m_meta->m_free.resize(N - 1);
372  std::iota(m_meta->m_free.begin(), m_meta->m_free.end(), 1);
373  static_assert(N > 0, "N has to be a positive integer.");
374  }
375 
376  auto fork () -> std::optional<WaitFreeSimulatorHandle<LockFree, N>> {
377  auto meta = std::atomic_load(&m_meta);
378  const auto lock = std::lock_guard<std::mutex>{meta->m_free_lock};
379  if (meta->m_free.empty()) {
380 #ifdef TEL_LOGGING
381  LOG_F(WARNING, "New simulator handle CANNOT be created");
382 #endif
383  return {};
384  }
385  auto next_id = meta->m_free.back();
386  meta->m_free.pop_back();
387 #ifdef TEL_LOGGING
388  LOG_F(INFO, "New simulator handle created with id = %d", next_id);
389 #endif
390  return WaitFreeSimulatorHandle{next_id, m_simulator, meta};
391  }
392 
393  template<typename Fun, typename RetVal>
394  auto fork (Fun &&fun) -> std::optional<RetVal> {
395  if (auto handle = fork(); handle.has_value()) {
396  auto result = fun(handle);
397  handle.retire();
398  return result;
399  }
400  return std::nullopt;
401  }
402 
403  auto retire () -> void {
404  auto meta = std::atomic_load(&m_meta);
405  const auto lock = std::lock_guard{meta->m_free_lock};
406  meta->m_free.push_back(m_id);
407 #ifdef TEL_LOGGING
408  LOG_F(INFO, "Simulator handle with id = %d retired", m_id);
409 #endif
410  }
411 
412  auto submit (const Input &input, bool use_slow_path = false) -> Output {
413  auto sim = std::atomic_load(&m_simulator);
414 #ifdef TEL_LOGGING
415  LOG_S(INFO) << "Simulator was submitted a new operation with input = " << input;
416  LOG_IF_F(INFO, use_slow_path, "Setting a preference to use the slow path");
417 #endif
418  return sim->run(m_id, input, use_slow_path);
419  }
420 
421  auto help () -> void {
422  auto sim = std::atomic_load(&m_simulator);
423 #ifdef TEL_LOGGING
424  LOG_F(INFO, "Simulator is trying to help other threads");
425 #endif
426  sim->try_help_others(m_id);
427  }
428 
429  private:
430  WaitFreeSimulatorHandle (Id id, std::shared_ptr<Simulator> t_simulator, std::shared_ptr<MetaData> t_meta)
431  : m_simulator{t_simulator}, m_id{id}, m_meta{t_meta} {}
432 
433  private:
434  std::shared_ptr<Simulator> m_simulator{};
435  std::shared_ptr<MetaData> m_meta{};
436  Id m_id;
437 
438  public:
439  [[maybe_unused]] static inline constexpr bool Use_slow_path = true;
440  [[maybe_unused]] static inline constexpr bool Use_fast_path = false;
441 };
442 
443 }
444 
445 #endif // TELAMON_WAIT_FREE_SIMULATOR_HH_
This is the main class representing the help queue.
Definition: HelpQueue.hh:29
Measures the contention which was encountered during simulation.
Definition: Versioning.hh:13
A class which represents a single operation stored in the help queue.
Definition: OperationHelping.hh:85
A class which represents a single operation contained in a OperationRecordBox.
Definition: OperationHelping.hh:14
A handle class which is used to obtain access to the wait-free simulator.
Definition: WaitFreeSimulator.hh:344
A class which represents the meta data of the handle class. Used only when forking a handle from anot...
Definition: WaitFreeSimulator.hh:362
The main structure of the simulator. Contains the operations performed by the simulator.
Definition: WaitFreeSimulator.hh:40
auto run(const Id id, const Input &input, bool use_slow_path=false) -> Output
Runs the actual simulation.
Definition: WaitFreeSimulator.hh:66
auto try_help_others(const Id id) -> void
Checks whether other threads need help with a certain operation and tries to help them.
Definition: WaitFreeSimulator.hh:99
This module encapsulates the implementation of the simulator.
Definition: NormalizedRepresentation.hh:25
Used by std::visit for the helping operation in the simulator.
Definition: WaitFreeSimulator.hh:31