1 #ifndef TELAMON_WAIT_FREE_SIMULATOR_HH_
2 #define TELAMON_WAIT_FREE_SIMULATOR_HH_
14 #include <type_traits>
17 #include <extern/expected_lite/expected.hpp>
20 #define LOGURU_WITH_STREAMS 1
21 #include <extern/loguru/loguru.hpp>
26 #include "HelpQueue.hh"
27 #include "OperationHelping.hh"
36 namespace telamon_private {
39 template<NormalizedRepresentation LockFree, const
int N = 16>
42 using Input =
typename LockFree::Input;
43 using Output =
typename LockFree::Output;
44 using Commit =
typename LockFree::Commit;
48 using OpState =
typename OperationRecord<LockFree>::OperationState;
50 template<
typename T,
typename Err = std::monostate>
51 using OptionalResultOrError = nonstd::expected<std::optional<T>, Err>;
54 explicit WaitFreeSimulator (
const LockFree &lf) : m_algorithm{lf}, m_helpqueue{} {}
56 explicit WaitFreeSimulator (LockFree &&lf) : m_algorithm{std::move(lf)}, m_helpqueue{} {}
66 auto run (
const Id
id,
const Input &input,
bool use_slow_path =
false) -> Output {
69 LOG_S(INFO) <<
"Running the simulation with id = '" <<
id <<
"' and input = '" << input <<
"'";
70 LOG_IF_F(INFO, use_slow_path,
"Setting a preference to use the slow path");
75 for (
int i = 0; i < ContentionFailureCounter::FAST_PATH_RETRY_THRESHOLD; ++i) {
77 LOG_S(INFO) <<
"Retry #" << i <<
"using the fast-path with input = " << input;
79 auto fp_result = fast_path(input, contention_counter);
80 if (fp_result.has_value()) {
82 LOG_F(INFO,
"Fast-path succeeded. Returning output");
84 return fp_result.value();
86 if (contention_counter.detect()) {
88 LOG_F(INFO,
"Contention detected. Using slow-path.");
95 return slow_path(
id, input);
100 auto front = m_helpqueue.peek_front();
101 if (front.has_value()) {
103 LOG_F(INFO,
"Operation requires help in the helpq. Tryting to help it.");
105 help(*front.value());
111 auto help_precas (OpBox &op_box,
const OpRecord &op,
const typename OpRecord::PreCas &state) -> OptionalResultOrError<OpRecord *> {
115 auto desc = m_algorithm.generator(op.input(), failures);
116 if (!desc.has_value()) {
117 return nonstd::make_unexpected(std::monostate{});
120 auto updated_state = OpState{
typename OpRecord::ExecutingCas(desc.value())};
121 return new OpRecord{op, updated_state};
125 auto help_postcas (OpBox &op_box,
const OpRecord &op,
const typename OpRecord::PostCas &state) -> OptionalResultOrError<OpRecord *> {
126 auto failures = ContentionFailureCounter{};
128 auto result_opt = m_algorithm.wrap_up(state.executed, state.cas_list, failures);
129 if (!result_opt.has_value()) {
131 return nonstd::make_unexpected(std::monostate{});
134 if (
auto result = result_opt.value(); result.has_value()) {
136 auto updated_state = OpState{
typename OpRecord::Completed(result.value())};
137 return new OpRecord{op, updated_state};
141 auto updated_state = OpState{
typename OpRecord::PreCas{}};
142 return new OpRecord{op, updated_state};
146 auto help_executingcas (OpBox &op_box,
const OpRecord &op,
typename OpRecord::ExecutingCas &state) -> OptionalResultOrError<OpRecord *, int> {
147 auto failures = ContentionFailureCounter{};
149 auto result = commit(state.cas_list, failures);
150 if (!result.has_value()) {
151 if (
auto err = result.error(); err.has_value()) {
153 return std::optional<OpRecord *>{};
156 return nonstd::make_unexpected(err.value());
160 auto updated_op =
new OpRecord{op,
typename OpRecord::PostCas(state.cas_list, result)};
168 auto help (OperationRecordBox<LockFree> &op_box) ->
void {
169 using HelperVisitResult = std::pair<bool, OptionalResultOrError<OpRecord *>>;
171 auto op_ptr = op_box.ptr();
172 const auto &op = *op_ptr;
175 [&] (
const typename OpRecord::PreCas &arg) -> HelperVisitResult {
177 LOG_F(INFO,
"Performing help of an operation in the PreCas state.");
179 auto result = help_precas(op_box, op, arg);
180 bool continue_ = !result.has_value();
181 return std::make_pair(continue_, result);
183 [&] (
const typename OpRecord::ExecutingCas &arg) -> HelperVisitResult {
185 LOG_F(INFO,
"Performing help of an operation in the ExecutingCas state.");
187 auto &mut_arg = *
const_cast<typename OpRecord::ExecutingCas *
>(&arg);
188 auto result_ = help_executingcas(op_box, op, mut_arg);
190 bool continue_ = result_.has_value() && !result_.value().has_value();
194 if (continue_) {
return std::make_pair(continue_,
nullptr); }
197 if (result_.has_value()) {
return std::make_pair(continue_, result_.value().value()); }
199 auto unit = nonstd::make_unexpected(std::monostate{});
200 return std::make_pair(continue_, unit);
202 [&] (
const typename OpRecord::PostCas &arg) -> HelperVisitResult {
204 LOG_F(INFO,
"Performing help of an operation in the PostCas state.");
206 auto result = help_postcas(op_box, op, arg);
207 bool continue_ = !result.has_value();
208 return std::make_pair(continue_, result);
210 [&] (
const typename OpRecord::Completed &arg) -> HelperVisitResult {
212 LOG_F(INFO,
"Performing help of an operation in the Completed state.");
214 auto _ = m_helpqueue.try_pop_front(&op_box);
215 auto updated_state = op_box.state();
216 return std::make_pair(
false, std::make_optional(
new OpRecord{op, updated_state}));
220 if (continue_) {
continue; }
221 if (!updated_op) {
break; }
224 OpRecord *updated_op_ptr = updated_op.value().value();
225 if (!op_box.atomic_ptr().compare_exchange_strong(op_ptr, updated_op_ptr)) {
228 LOG_F(WARNING,
"CAS during help of an operation failed.");
230 delete updated_op_ptr;
233 if (std::holds_alternative<typename OpRecord::Completed>(op_box.state())) {
235 LOG_F(INFO,
"Operation which required help now finished. Returning from help.");
247 auto commit (Commit &cas_list, ContentionFailureCounter &failures) -> nonstd::expected<std::monostate, std::optional<int>> {
248 for (
int i = 0;
auto &cas : cas_list) {
249 switch (
auto state = cas.state()) {
250 case CasStatus::Failure:
252 LOG_F(WARNING,
"During commit: CAS #%d failed.", i);
254 return nonstd::make_unexpected(i);
256 case CasStatus::Success:
258 LOG_F(INFO,
"During commit: CAS was successfully executed. Clearing modified_bit and returning.");
262 case CasStatus::Pending: {
263 if (
auto result = cas.execute(failures); !result) {
265 LOG_F(INFO,
"During commit: CAS #%d failed. Returning...", i);
267 return nonstd::make_unexpected(std::nullopt);
269 if (cas.has_modified_bit()) {
270 (void) cas.swap_state(CasStatus::Pending, CasStatus::Success);
273 LOG_F(INFO,
"During commit: CAS #%d succeeded. Getting to the next one.", i);
276 if (cas.state() != CasStatus::Success) {
277 cas.set_state(CasStatus::Failure);
279 LOG_F(WARNING,
"During commit: CAS #%d failed. Returning...", i);
281 return nonstd::make_unexpected(i);
290 LOG_F(INFO,
"During commit: All CAS-es succeeded. Returning...");
292 return std::monostate{};
298 auto slow_path (
const Id
id,
const Input &input) -> Output {
300 auto *op_box =
new OperationRecordBox<LockFree>{id,
typename OpRecord::PreCas{}, input};
301 m_helpqueue.push_back(
id, op_box);
303 LOG_S(INFO) <<
"During slowpath: Enqueueing a new operation record box in Precas state with input = " << input <<
" and id = " << id;
307 using StateCompleted =
typename OperationRecord<LockFree>::Completed;
309 auto updated_state = op_box->state();
311 LOG_F(INFO,
"During slow path: Checking the state the enqueued operation");
313 if (std::holds_alternative<StateCompleted>(updated_state)) {
314 auto sp_result = std::get<StateCompleted>(updated_state);
316 LOG_S(INFO) <<
"Operation succeeded with output = " << sp_result.output;
318 return sp_result.output;
321 LOG_F(INFO,
"During slow path: Operation still not finished. Trying to help again.");
328 auto fast_path (
const Input &input, ContentionFailureCounter &contention_counter) -> std::optional<Output> {
330 LOG_F(INFO,
"Invoking the fast path of the algorithm");
332 return m_algorithm.fast_path(input, contention_counter);
336 LockFree m_algorithm;
343 template<NormalizedRepresentation LockFree, const
int N = 16>
347 using Input =
typename LockFree::Input;
348 using Output =
typename LockFree::Output;
349 using Commit =
typename LockFree::Commit;
353 using OpState =
typename OperationRecord<LockFree>::OperationState;
355 template<
typename T,
typename Err = std::monostate>
356 using OptionalResultOrError = nonstd::expected<std::optional<T>, Err>;
363 std::vector<Id> m_free;
364 std::mutex m_free_lock;
369 : m_id{0}, m_simulator{std::make_shared<
Simulator>(algorithm)}, m_meta{std::make_shared<
MetaData>()} {
371 m_meta->m_free.resize(N - 1);
372 std::iota(m_meta->m_free.begin(), m_meta->m_free.end(), 1);
373 static_assert(N > 0,
"N has to be a positive integer.");
376 auto fork () -> std::optional<WaitFreeSimulatorHandle<LockFree, N>> {
377 auto meta = std::atomic_load(&m_meta);
378 const auto lock = std::lock_guard<std::mutex>{meta->m_free_lock};
379 if (meta->m_free.empty()) {
381 LOG_F(WARNING,
"New simulator handle CANNOT be created");
385 auto next_id = meta->m_free.back();
386 meta->m_free.pop_back();
388 LOG_F(INFO,
"New simulator handle created with id = %d", next_id);
390 return WaitFreeSimulatorHandle{next_id, m_simulator, meta};
393 template<
typename Fun,
typename RetVal>
394 auto fork (Fun &&fun) -> std::optional<RetVal> {
395 if (
auto handle = fork(); handle.has_value()) {
396 auto result = fun(handle);
403 auto retire () ->
void {
404 auto meta = std::atomic_load(&m_meta);
405 const auto lock = std::lock_guard{meta->m_free_lock};
406 meta->m_free.push_back(m_id);
408 LOG_F(INFO,
"Simulator handle with id = %d retired", m_id);
412 auto submit (
const Input &input,
bool use_slow_path =
false) -> Output {
413 auto sim = std::atomic_load(&m_simulator);
415 LOG_S(INFO) <<
"Simulator was submitted a new operation with input = " << input;
416 LOG_IF_F(INFO, use_slow_path,
"Setting a preference to use the slow path");
418 return sim->run(m_id, input, use_slow_path);
421 auto help () ->
void {
422 auto sim = std::atomic_load(&m_simulator);
424 LOG_F(INFO,
"Simulator is trying to help other threads");
426 sim->try_help_others(m_id);
430 WaitFreeSimulatorHandle (Id
id, std::shared_ptr<Simulator> t_simulator, std::shared_ptr<MetaData> t_meta)
431 : m_simulator{t_simulator}, m_id{id}, m_meta{t_meta} {}
434 std::shared_ptr<Simulator> m_simulator{};
435 std::shared_ptr<MetaData> m_meta{};
439 [[maybe_unused]]
static inline constexpr
bool Use_slow_path =
true;
440 [[maybe_unused]]
static inline constexpr
bool Use_fast_path =
false;
This is the main class representing the help queue.
Definition: HelpQueue.hh:29
Measures the contention which was encountered during simulation.
Definition: Versioning.hh:13
A class which represents a single operation stored in the help queue.
Definition: OperationHelping.hh:85
A class which represents a single operation contained in a OperationRecordBox.
Definition: OperationHelping.hh:14
A handle class which is used to obtain access to the wait-free simulator.
Definition: WaitFreeSimulator.hh:344
The main structure of the simulator. Contains the operations performed by the simulator.
Definition: WaitFreeSimulator.hh:40
auto run(const Id id, const Input &input, bool use_slow_path=false) -> Output
Runs the actual simulation.
Definition: WaitFreeSimulator.hh:66
auto try_help_others(const Id id) -> void
Checks whether other threads need help with a certain operation and tries to help them.
Definition: WaitFreeSimulator.hh:99
This module encapsulates the implementation of the simulator.
Definition: NormalizedRepresentation.hh:25
Used by std::visit for the helping operation in the simulator.
Definition: WaitFreeSimulator.hh:31