concurrency

Contents

concurrency#

namespace heph
namespace concurrency#
namespace io_ring#
class IoRing#

Public Functions

explicit IoRing(const IoRingConfig &config)#
bool stopRequested()#
void requestStop()#
stdexec::inplace_stop_token getStopToken()#
void submit(IoRingOperationBase *operation)#
void runOnce(bool block = true)#
void run(const std::function<void()> &on_start = [] {}, const std::function<bool()> &on_progress = [] { return false;})#
bool isRunning()#
bool isCurrentRing()#
struct IoRingConfig#

Public Members

std::uint32_t nentries = {DEFAULT_ENTRY_COUNT}#
std::uint32_t flags = {0}#

Public Static Attributes

static std::uint32_t DEFAULT_ENTRY_COUNT = 8#
namespace heph
namespace concurrency
struct RepeatUntilT#

Public Functions

template<stdexec::sender SenderT>
inline auto operator()(SenderT &&sender) const#
template<internal::SenderFactory SenderFactoryT>
inline auto operator()(SenderFactoryT &&sender_factory) const#
template<>
struct SenderExpressionImpl<RepeatUntilT> : public heph::concurrency::DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_COMPLETION_SIGNATURES = []<typename Sender>(Sender&&, heph::concurrency::Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}#
static auto GET_STATE = []<typename Sender, typename Receiver>(Sender&& sender,Receiver&& receiver) noexcept {auto [_, sender_factory] = std::forward<Sender>(sender);using SenderFactoryT = std::decay_t<stdexec::__data_of<Sender>>;using ReceivertT = std::decay_t<Receiver>;return internal::RepeatUntilStateT<SenderFactoryT, ReceivertT>{ std::move(sender_factory),std::forward<Receiver>(receiver) };}#
static auto START = []<typename Receiver>(auto& self, Receiver&& ) { self.start(); }#

Variables

RepeatUntilT repeatUntil = {}#
namespace heph
namespace concurrency
namespace io_ring
class IoRingOperationBase#

Public Functions

virtual ~IoRingOperationBase() = default#
inline virtual void prepare(::io_uring_sqe *sqe)#
virtual void handleCompletion(::io_uring_cqe *cqe) = 0#
namespace heph
namespace concurrency
class SpinnersManager#
#include “hephaestus/concurrency/spinners_manager.h”

SpinnersManager allows to orchestrate the execution of multiple spinners. The main feature is waitAny which allows to unblock as soon as one of the spinners is done. This allows to catch errors in spinner execution as soon as possible and stop the others. NOTE: right now we do not have any concept of error for the spinners: we cannot know if a spinner terminated with an error or not. If an exception is thrown inside the runner, it will be re-thrown when we call runner.stop().get(). We leave it to the user to handle it outside of the runner manager. NOTE: this logic is quite generic and can be extended to any type of object that has wait() and stop() methods. To be faitful to the principle of implement only what you need now, we limit the scope to spinners and consider to expand the scope when an use case arises.

Public Functions

explicit SpinnersManager(std::vector<Spinner*> spinners)#
void startAll()#
void waitAll()#

blocks until all spinners are finished. If a single spinner throws an exception, it will not be propagated as the other spinners are still blocking.

void waitAny()#

wait until any spinner terminates or throws an exception, which allows for immediate exception handling. Note that the exceptions will be re-thrown when calling stopAll.

void stopAll()#
namespace heph
namespace concurrency
template<typename T>
class MessageQueueConsumer#
#include “hephaestus/concurrency/message_queue_consumer.h”

MessageQueueConsumer creates a thread that wait for messages to be pushed in the queue and call the user provided callback on them.

Public Types

using Callback = std::function<void(T&&)>#

Public Functions

MessageQueueConsumer(Callback &&callback, std::optional<std::size_t> max_queue_size)#
~MessageQueueConsumer() = default#
MessageQueueConsumer(const MessageQueueConsumer&) = delete#
MessageQueueConsumer(MessageQueueConsumer&&) = delete#
MessageQueueConsumer &=delete operator= (const MessageQueueConsumer &)
MessageQueueConsumer &=delete operator= (MessageQueueConsumer &&)
void start()#
std::future<void> stop()#

Stop the message consumer, emptying the queue and stopping the processing.

Returns:

future that waits on the queue to be emptied

containers::BlockingQueue<T> &queue()#

Return the queue to allow to push messages to process. This is simpler than having to mask all the different type of push methods of the queue. The downside is that it is possible to consume messages from the outside without calling the callback.

namespace heph
namespace concurrency
namespace spinner_state_machine#

Enums

enum class State : std::uint8_t#

Values:

enumerator NOT_INITIALIZED#
enumerator FAILED#
enumerator READY_TO_SPIN#
enumerator SPIN_SUCCESSFUL#
enumerator EXIT#
enum class Result : std::uint8_t#

Values:

enumerator PROCEED#
enumerator FAILURE#
enumerator REPEAT#

Functions

StateMachineCallbackT createStateMachineCallback(Callbacks &&callbacks)#

Create a callback for the spinner which internally handles the state machine. The current state is stored and a copy is returned to the caller.

struct Callbacks#

Public Members

OperationCallbackT init_cb = []() -> Result { return Result::PROCEED; }#

Handles initialization.

OperationCallbackT spin_once_cb = []() -> Result { return Result::PROCEED; }#

Handles execution.

CheckCallbackT shall_restart_cb = []() -> bool {return false;}#

This callback is called after failure. It decides if operation shall restart, or the spinner shall conclude. Default: do not restart.

Typedefs

using OperationCallbackT = std::function<Result()>#
using CheckCallbackT = std::function<bool()>#
using StateMachineCallbackT = std::function<State()>#
namespace heph
namespace concurrency

Functions

template<typename TagT, typename Data = stdexec::__, typename ...Children>
auto makeSenderExpression(Data &&data = {}, Children&&... children)#
struct DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_ATTRS = stdexec::__sexpr_defaults::get_attrs#
static auto GET_ENV = stdexec::__sexpr_defaults::get_env#
static auto GET_STATE = stdexec::__sexpr_defaults::get_state#
static auto CONNECT = stdexec::__sexpr_defaults::connect#
static auto START = stdexec::__sexpr_defaults::start#
static auto COMPLETE = stdexec::__sexpr_defaults::complete#
struct Ignore#

Public Functions

Ignore() noexcept = default#
template<typename ...T>
inline Ignore(T&&...) noexcept#
template<typename T>
struct Tag#

Typedefs

template<typename Function, typename ...Ts>
using CallResultT = stdexec::__call_result_t<Function, Ts...>#
template<typename SenderExpressionT>
using DataOfT = stdexec::__data_of<SenderExpressionT>#
namespace stdexec#
template<typename TagT>
struct __sexpr_impl<::heph::concurrency::Tag<TagT>>#

Public Types

using Impl = ::heph::concurrency::SenderExpressionImpl<TagT>#

Public Static Attributes

static auto get_completion_signatures = Impl::GET_COMPLETION_SIGNATURES#
static auto get_attrs = Impl::GET_ATTRS#
static auto get_env = Impl::GET_ENV#
static auto get_state = Impl::GET_STATE#
static auto connect = Impl::CONNECT#
static auto start = Impl::START#
static auto complete = Impl::COMPLETE#
namespace heph
namespace concurrency
class Spinner#
#include “hephaestus/concurrency/spinner.h”

A spinner is a class that spins in a loop calling a user-defined function. If the function is blocking, the spinner will block the thread. If the input rate_hz is set to a non-infinite value, the spinner will call the user-defined function at the given fixed rate. The spinner behavior can be configured using callbacks.

Public Types

enum class SpinResult : bool#

Values:

enumerator CONTINUE#
enumerator STOP#
using StoppableCallback = std::function<SpinResult()>#
using Callback = std::function<void()>#

Public Functions

explicit Spinner(StoppableCallback &&stoppable_callback, std::optional<std::chrono::duration<double>> spin_period = std::nullopt, std::optional<std::string> component_name = std::nullopt)#

Create a spinner with a stoppable callback. A stoppable callback is a function that returns SpinResult::STOP to indicate that the spinner should stop. Other types of callbacks are supported via mappings to StoppableCallback. Example: a callback that stops after 10 iterations.

Parameters:
  • stoppable_callback – The callback to be called in the spinner loop.

  • spin_period – The duration between spins. If not provided, the spinner will spin as fast as possible.

  • component_name – A unique name for this spinner for telemetry logging.

~Spinner()#
Spinner(const Spinner&) = delete#
Spinner &=delete operator= (const Spinner &)
Spinner(Spinner&&) = delete#
Spinner &=delete operator= (Spinner &&)
void start()#
std::future<void> stop()#
void wait()#
void setTerminationCallback(Callback &&termination_callback)#

Set a callback that will be called when the spinner is stopped. This callback could be extendend to pass the reason why the spinner was stopped, e.g. exceptions, …

Public Static Functions

static StoppableCallback createNeverStoppingCallback(Callback &&callback)#

Wrap the user provided callback in a callback that never stops.

static StoppableCallback createCallbackWithStateMachine(spinner_state_machine::StateMachineCallbackT &&state_machine_callback)#

Create a callback for the spinner which internally handles the state machine.

namespace heph
namespace concurrency
namespace io_ring
template<typename IoRingOperationT>
struct StoppableIoRingOperation : public heph::concurrency::io_ring::IoRingOperationBase#
struct StopCallback#

Public Members

StoppableIoRingOperation *self#

Public Functions

inline void operator()() const#
struct StopOperation : public heph::concurrency::io_ring::IoRingOperationBase#

Public Members

StoppableIoRingOperation *self#

Public Functions

inline void prepare(::io_uring_sqe *sqe) final#
inline void handleCompletion(::io_uring_cqe *cqe) final#

Public Members

IoRingOperationT operation#
IoRing *ring = {nullptr}#
stdexec::inplace_stop_callback<StopCallback> stop_callback#
int in_flight = {1}#
std::optional<StopOperation> stop_operation#

Public Functions

inline StoppableIoRingOperation(IoRingOperationT op, IoRing &ring, stdexec::inplace_stop_token token)#
inline void prepare(::io_uring_sqe *sqe) final#
inline void handleCompletion(::io_uring_cqe *cqe) final#
inline void requestStop()#
namespace heph
namespace concurrency
namespace io_ring

Enums

enum class ClockMode : std::uint8_t#

Values:

enumerator WALLCLOCK#
enumerator SIMULATED#
class Timer#
struct Operation#

Public Members

Timer *timer = {nullptr}#

Public Functions

void prepare(::io_uring_sqe *sqe) const#
void handleCompletion(::io_uring_cqe *cqe) const#
void handleStopped() const#
struct UpdateOperation#

Public Members

Timer *timer = {nullptr}#
__kernel_timespec next_timeout = {}#

Public Functions

void prepare(::io_uring_sqe *sqe) const#
void handleCompletion(::io_uring_cqe *cqe) const#
void handleStopped()#

Public Functions

explicit Timer(IoRing &ring, TimerOptions options)#
~Timer() noexcept#
inline bool empty() const#
void requestStop()#
void tick()#
void startAt(TaskBase *task, TimerClock::time_point start_time)#
void dequeue(TaskBase *task)#
inline TimerClock::time_point now()#
inline TimerClock::duration elapsed()#
bool tickSimulated(bool advance)#
template<typename Rep, typename Period>
inline void advanceSimulation(std::chrono::duration<Rep, Period> duration)#
inline auto clockMode()#
struct TimerClock#

Public Types

using base_clock = std::chrono::system_clock#
using duration = std::chrono::microseconds#
using rep = duration::rep#
using period = duration::period#
using time_point = std::chrono::time_point<base_clock, duration>#

Public Static Attributes

static bool is_steady = base_clock::is_steady#
static Timer *timer#

Public Static Functions

static time_point now()#
struct TimerEntry#

Public Members

TaskBase *task = {nullptr}#
TimerClock::time_point start_time#
struct TimerOptions#

Public Members

ClockMode clock_mode = {ClockMode::WALLCLOCK}#
namespace heph
namespace concurrency
class Context#
struct StopCallback#

Public Members

Context *self#

Public Functions

inline void operator()() const noexcept#

Public Types

using Scheduler = ContextScheduler#

Public Functions

inline explicit Context(const ContextConfig &config)#
inline Scheduler scheduler()#
void run(const std::function<void()> &on_start = [] {})#
inline void requestStop()#
inline bool isCurrent()#
inline bool stopRequested()#
inline stdexec::inplace_stop_token getStopToken()#
inline auto elapsed()#
inline io_ring::IoRing *ring()#
struct ContextConfig#

Public Members

io_ring::IoRingConfig io_ring_config#
TimerOptionsT timer_options#

Typedefs

using TimerOptionsT = io_ring::TimerOptions#
using ClockT = io_ring::TimerClock#
namespace heph
namespace concurrency
struct ContextEnv#

Public Members

Context *self#

Public Functions

inline ContextScheduler query(stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept#
stdexec::inplace_stop_token query(stdexec::get_stop_token_t) const noexcept#
Context &query(GetContextT) const noexcept#

Public Static Functions

static inline auto query(stdexec::__is_scheduler_affine_t) noexcept#
struct ContextScheduleAtT#
struct ContextScheduler#

Public Members

Context *self#

Public Functions

inline Context &context() const#
template<typename TagT = ContextScheduleT>
inline auto schedule()#
template<typename Rep, typename Period, typename TagT = ContextScheduleAtT>
inline auto scheduleAfter(std::chrono::duration<Rep, Period> duration)#
template<typename Clock, typename Duration, typename TagT = ContextScheduleAtT>
inline auto scheduleAt(std::chrono::time_point<Clock, Duration> time_point)#

Public Static Functions

static inline auto query(stdexec::get_forward_progress_guarantee_t) noexcept#
struct ContextScheduleT#
struct GetContextT : public stdexec::__query<GetContextT>#

Public Functions

template<typename Env>
inline stdexec::tag_invoke_result_t<GetContextT, const Env&> operator()(const Env &env) const noexcept#
template<typename Tag = GetContextT>
auto operator()() const noexcept#

Public Static Functions

static inline bool query(stdexec::forwarding_query_t) noexcept#
template<>
struct SenderExpressionImpl<ContextScheduleAtT> : public heph::concurrency::DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_COMPLETION_SIGNATURES = [](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}#
static auto GET_ATTRS = [](auto& data) noexcept -> ContextEnv { return { std::get<0>(data) }; }#
static auto GET_STATE = []<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, data] = std::forward<Sender>(sender);auto* context = std::get<0>(data);return TimedTask<std::decay_t<Receiver>, Context>{ context, std::get<1>(data), std::move(receiver) };}#
static auto START = []<typename Receiver>(TimedTask<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }#
template<>
struct SenderExpressionImpl<ContextScheduleT> : public heph::concurrency::DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_COMPLETION_SIGNATURES = [](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}#
static auto GET_ATTRS = [](Context* context) noexcept -> ContextEnv { return { context }; }#
static auto GET_STATE = []<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, context] = std::forward<Sender>(sender);return Task<std::decay_t<Receiver>, Context>{ context, std::move(receiver) };}#
static auto START = []<typename Receiver>(Task<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }#
template<typename Receiver, typename Context>
struct Task : public heph::concurrency::TaskBase#

Public Members

Context *context = {nullptr}#
Receiver receiver#

Public Functions

inline Task(Context *context_input, Receiver &&receiver_input)#
inline void start() noexcept final#
inline void setValue() noexcept final#
inline void setStopped() noexcept final#
struct TaskBase#

Public Members

TaskDispatchOperation dispatch_operation = {this}#
TaskBase *next = {nullptr}#
TaskBase *prev = {nullptr}#

Public Functions

virtual ~TaskBase() = default#
virtual void start() noexcept = 0#
virtual void setValue() noexcept = 0#
virtual void setStopped() noexcept = 0#
struct TaskDispatchOperation : public heph::concurrency::io_ring::IoRingOperationBase#

Public Members

TaskBase *self#

Public Functions

inline explicit TaskDispatchOperation(TaskBase *task) noexcept#
void handleCompletion(::io_uring_cqe *cqe) final#
template<typename ReceiverT, typename ContextT>
struct TimedTask : public heph::concurrency::TaskBase#
struct StopCallback#

Public Members

TimedTask *self#

Public Functions

inline void operator()() const noexcept#

Public Types

using ReceiverEnvT = stdexec::env_of_t<ReceiverT>#
using StopTokenT = stdexec::stop_token_of_t<ReceiverEnvT>#
using StopCallbackT = stdexec::stop_callback_for_t<StopTokenT, StopCallback>#

Public Members

ContextT *context = {nullptr}#
io_ring::TimerClock::time_point start_time#
ReceiverT receiver#
std::optional<StopCallbackT> stop_callback#
bool timeout_started = {false}#

Public Functions

template<typename Clock, typename Duration>
inline TimedTask(Context *context_input, std::chrono::time_point<Clock, Duration> start_time_input, ReceiverT &&receiver_input)#
inline void start() noexcept final#
inline void setValue() noexcept final#
inline void setStopped() noexcept final#

Variables

static GetContextT getContext = {}#