concurrency#
-
namespace heph
-
namespace concurrency#
-
namespace io_ring#
-
class IoRing#
Public Functions
-
explicit IoRing(const IoRingConfig &config)#
-
bool stopRequested()#
-
void requestStop()#
-
void submit(IoRingOperationBase *operation)#
-
void runOnce(bool block = true)#
-
void run(const std::function<void()> &on_start =
[] {}
, const std::function<bool()> &on_progress =[] { return false;}
)#
-
bool isRunning()#
-
bool isCurrentRing()#
-
explicit IoRing(const IoRingConfig &config)#
-
struct IoRingConfig#
-
class IoRing#
-
namespace io_ring#
-
namespace concurrency#
-
namespace heph
-
namespace concurrency
-
struct RepeatUntilT#
Public Functions
-
template<internal::SenderFactory SenderFactoryT>
inline auto operator()(SenderFactoryT &&sender_factory) const#
-
template<internal::SenderFactory SenderFactoryT>
-
template<>
struct SenderExpressionImpl<RepeatUntilT> : public heph::concurrency::DefaultSenderExpressionImpl# Public Static Attributes
-
static auto GET_COMPLETION_SIGNATURES =
[]<typename Sender>(Sender&&, heph::concurrency::Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}
#
-
static auto GET_STATE =
[]<typename Sender, typename Receiver>(Sender&& sender,Receiver&& receiver) noexcept {auto [_, sender_factory] = std::forward<Sender>(sender);using SenderFactoryT = std::decay_t<stdexec::__data_of<Sender>>;using ReceivertT = std::decay_t<Receiver>;return internal::RepeatUntilStateT<SenderFactoryT, ReceivertT>{ std::move(sender_factory),std::forward<Receiver>(receiver) };}
#
-
static auto START =
[]<typename Receiver>(auto& self, Receiver&& ) { self.start(); }
#
-
static auto GET_COMPLETION_SIGNATURES =
Variables
-
RepeatUntilT repeatUntil = {}#
-
struct RepeatUntilT#
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
namespace io_ring
-
class IoRingOperationBase#
-
class IoRingOperationBase#
-
namespace io_ring
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
class SpinnersManager#
- #include “hephaestus/concurrency/spinners_manager.h”
SpinnersManager allows to orchestrate the execution of multiple spinners. The main feature is
waitAny
which allows to unblock as soon as one of the spinners is done. This allows to catch errors in spinner execution as soon as possible and stop the others. NOTE: right now we do not have any concept of error for the spinners: we cannot know if a spinner terminated with an error or not. If an exception is thrown inside the runner, it will be re-thrown when we call runner.stop().get(). We leave it to the user to handle it outside of the runner manager. NOTE: this logic is quite generic and can be extended to any type of object that haswait()
andstop()
methods. To be faitful to the principle of implement only what you need now, we limit the scope to spinners and consider to expand the scope when an use case arises.Public Functions
-
void startAll()#
-
void waitAll()#
blocks until all spinners are finished. If a single spinner throws an exception, it will not be propagated as the other spinners are still blocking.
-
void waitAny()#
wait until any spinner terminates or throws an exception, which allows for immediate exception handling. Note that the exceptions will be re-thrown when calling
stopAll
.
-
void stopAll()#
-
void startAll()#
-
class SpinnersManager#
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
template<typename T>
class MessageQueueConsumer# - #include “hephaestus/concurrency/message_queue_consumer.h”
MessageQueueConsumer creates a thread that wait for messages to be pushed in the queue and call the user provided callback on them.
Public Functions
-
~MessageQueueConsumer() = default#
-
MessageQueueConsumer(const MessageQueueConsumer&) = delete#
-
MessageQueueConsumer(MessageQueueConsumer&&) = delete#
- MessageQueueConsumer &=delete operator= (const MessageQueueConsumer &)
- MessageQueueConsumer &=delete operator= (MessageQueueConsumer &&)
-
void start()#
-
std::future<void> stop()#
Stop the message consumer, emptying the queue and stopping the processing.
- Returns:
future that waits on the queue to be emptied
-
containers::BlockingQueue<T> &queue()#
Return the queue to allow to push messages to process. This is simpler than having to mask all the different type of push methods of the queue. The downside is that it is possible to consume messages from the outside without calling the callback.
-
~MessageQueueConsumer() = default#
-
template<typename T>
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
namespace spinner_state_machine#
Enums
Functions
-
StateMachineCallbackT createStateMachineCallback(Callbacks &&callbacks)#
Create a callback for the spinner which internally handles the state machine. The current state is stored and a copy is returned to the caller.
-
struct Callbacks#
Public Members
-
OperationCallbackT init_cb =
[]() -> Result { return Result::PROCEED; }
# Handles initialization.
-
OperationCallbackT spin_once_cb =
[]() -> Result { return Result::PROCEED; }
# Handles execution.
-
CheckCallbackT shall_restart_cb =
[]() -> bool {return false;}
# This callback is called after failure. It decides if operation shall restart, or the spinner shall conclude. Default: do not restart.
-
OperationCallbackT init_cb =
-
StateMachineCallbackT createStateMachineCallback(Callbacks &&callbacks)#
-
namespace spinner_state_machine#
-
namespace concurrency
-
namespace heph
-
namespace stdexec#
-
template<typename TagT>
struct __sexpr_impl<::heph::concurrency::Tag<TagT>># Public Types
-
using Impl = ::heph::concurrency::SenderExpressionImpl<TagT>#
Public Static Attributes
-
using Impl = ::heph::concurrency::SenderExpressionImpl<TagT>#
-
template<typename TagT>
-
namespace heph
-
namespace concurrency
-
class Spinner#
- #include “hephaestus/concurrency/spinner.h”
A spinner is a class that spins in a loop calling a user-defined function. If the function is blocking, the spinner will block the thread. If the input
rate_hz
is set to a non-infinite value, the spinner will call the user-defined function at the given fixed rate. The spinner behavior can be configured using callbacks.Public Types
-
using StoppableCallback = std::function<SpinResult()>#
Public Functions
-
explicit Spinner(StoppableCallback &&stoppable_callback, std::optional<std::chrono::duration<double>> spin_period = std::nullopt, std::optional<std::string> component_name = std::nullopt)#
Create a spinner with a stoppable callback. A stoppable callback is a function that returns SpinResult::STOP to indicate that the spinner should stop. Other types of callbacks are supported via mappings to StoppableCallback. Example: a callback that stops after 10 iterations.
- Parameters:
stoppable_callback – The callback to be called in the spinner loop.
spin_period – The duration between spins. If not provided, the spinner will spin as fast as possible.
component_name – A unique name for this spinner for telemetry logging.
-
~Spinner()#
- Spinner &=delete operator= (const Spinner &)
- Spinner &=delete operator= (Spinner &&)
-
void start()#
-
void wait()#
Public Static Functions
-
static StoppableCallback createNeverStoppingCallback(Callback &&callback)#
Wrap the user provided callback in a callback that never stops.
-
static StoppableCallback createCallbackWithStateMachine(spinner_state_machine::StateMachineCallbackT &&state_machine_callback)#
Create a callback for the spinner which internally handles the state machine.
-
using StoppableCallback = std::function<SpinResult()>#
-
class Spinner#
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
namespace io_ring
-
template<typename IoRingOperationT>
struct StoppableIoRingOperation : public heph::concurrency::io_ring::IoRingOperationBase# -
-
struct StopOperation : public heph::concurrency::io_ring::IoRingOperationBase#
Public Members
Public Members
-
IoRingOperationT operation#
-
stdexec::inplace_stop_callback<StopCallback> stop_callback#
-
int in_flight = {1}#
-
std::optional<StopOperation> stop_operation#
-
struct StopOperation : public heph::concurrency::io_ring::IoRingOperationBase#
-
template<typename IoRingOperationT>
-
namespace io_ring
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
namespace io_ring
-
-
class Timer#
-
struct Operation#
-
struct UpdateOperation#
Public Functions
-
explicit Timer(IoRing &ring, TimerOptions options)#
-
~Timer() noexcept#
-
inline bool empty() const#
-
void requestStop()#
-
void tick()#
-
void startAt(TaskBase *task, TimerClock::time_point start_time)#
-
inline TimerClock::time_point now()#
-
inline TimerClock::duration elapsed()#
-
bool tickSimulated(bool advance)#
-
template<typename Rep, typename Period>
inline void advanceSimulation(std::chrono::duration<Rep, Period> duration)#
-
inline auto clockMode()#
-
struct Operation#
-
struct TimerClock#
Public Types
-
using time_point = std::chrono::time_point<base_clock, duration>#
Public Static Functions
-
static time_point now()#
-
using time_point = std::chrono::time_point<base_clock, duration>#
-
struct TimerEntry#
-
struct TimerOptions#
-
class Timer#
-
namespace io_ring
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
class Context#
-
Public Types
-
using Scheduler = ContextScheduler#
-
using Scheduler = ContextScheduler#
-
struct ContextConfig#
-
class Context#
-
namespace concurrency
-
namespace heph
-
namespace concurrency
-
struct ContextEnv#
-
Public Functions
-
inline ContextScheduler query(stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept#
-
Context &query(GetContextT) const noexcept#
-
inline ContextScheduler query(stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept#
-
struct ContextScheduleAtT#
-
struct ContextScheduler#
-
Public Functions
-
template<typename TagT = ContextScheduleT>
inline auto schedule()#
-
template<typename Rep, typename Period, typename TagT = ContextScheduleAtT>
inline auto scheduleAfter(std::chrono::duration<Rep, Period> duration)#
-
template<typename Clock, typename Duration, typename TagT = ContextScheduleAtT>
inline auto scheduleAt(std::chrono::time_point<Clock, Duration> time_point)#
-
template<typename TagT = ContextScheduleT>
-
struct ContextScheduleT#
-
struct GetContextT : public stdexec::__query<GetContextT>#
Public Functions
-
template<typename Env>
inline stdexec::tag_invoke_result_t<GetContextT, const Env&> operator()(const Env &env) const noexcept#
-
template<typename Tag = GetContextT>
auto operator()() const noexcept#
-
template<typename Env>
-
template<>
struct SenderExpressionImpl<ContextScheduleAtT> : public heph::concurrency::DefaultSenderExpressionImpl# Public Static Attributes
-
static auto GET_COMPLETION_SIGNATURES =
[](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}
#
-
static auto GET_ATTRS =
[](auto& data) noexcept -> ContextEnv { return { std::get<0>(data) }; }
#
-
static auto GET_STATE =
[]<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, data] = std::forward<Sender>(sender);auto* context = std::get<0>(data);return TimedTask<std::decay_t<Receiver>, Context>{ context, std::get<1>(data), std::move(receiver) };}
#
-
static auto START =
[]<typename Receiver>(TimedTask<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }
#
-
static auto GET_COMPLETION_SIGNATURES =
-
template<>
struct SenderExpressionImpl<ContextScheduleT> : public heph::concurrency::DefaultSenderExpressionImpl# Public Static Attributes
-
static auto GET_COMPLETION_SIGNATURES =
[](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}
#
-
static auto GET_ATTRS =
[](Context* context) noexcept -> ContextEnv { return { context }; }
#
-
static auto GET_STATE =
[]<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, context] = std::forward<Sender>(sender);return Task<std::decay_t<Receiver>, Context>{ context, std::move(receiver) };}
#
-
static auto START =
[]<typename Receiver>(Task<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }
#
-
static auto GET_COMPLETION_SIGNATURES =
-
template<typename Receiver, typename Context>
struct Task : public heph::concurrency::TaskBase#
-
struct TaskBase#
Public Members
-
TaskDispatchOperation dispatch_operation = {this}#
-
TaskDispatchOperation dispatch_operation = {this}#
-
struct TaskDispatchOperation : public heph::concurrency::io_ring::IoRingOperationBase#
-
template<typename ReceiverT, typename ContextT>
struct TimedTask : public heph::concurrency::TaskBase# -
Public Types
-
using StopTokenT = stdexec::stop_token_of_t<ReceiverEnvT>#
-
using StopCallbackT = stdexec::stop_callback_for_t<StopTokenT, StopCallback>#
Public Members
-
io_ring::TimerClock::time_point start_time#
-
std::optional<StopCallbackT> stop_callback#
-
bool timeout_started = {false}#
Public Functions
-
template<typename Clock, typename Duration>
inline TimedTask(Context *context_input, std::chrono::time_point<Clock, Duration> start_time_input, ReceiverT &&receiver_input)#
-
inline void start() noexcept final#
-
inline void setValue() noexcept final#
-
inline void setStopped() noexcept final#
-
using StopTokenT = stdexec::stop_token_of_t<ReceiverEnvT>#
Variables
-
static GetContextT getContext = {}#
-
struct ContextEnv#
-
namespace concurrency