concurrency

Contents

concurrency#

namespace heph
namespace concurrency#

Functions

template<internal::SenderFactory SenderFactoryT>
auto repeatUntil(SenderFactoryT &&factory)#

repeatedly starts

This can be used to implement loops using sender receivers. By passing a factory instead of the sender directly, we are able to use movable only senders.

Parameters:
  • sender_factory – until the completion of the sender returns true.

  • sender_factory – nullary function object returning a sender with a bool completion set_value completion.

template<stdexec::sender SenderT>
auto repeatUntil(SenderT &&sender)#

Convenience wrapper for repeatUntil for cases where.

Parameters:

sender – is copyable

namespace heph
namespace concurrency
namespace io_ring#
class IoRingOperationBase#

Public Functions

virtual ~IoRingOperationBase() = default#
inline virtual void prepare(::io_uring_sqe *sqe)#
virtual void handleCompletion(::io_uring_cqe *cqe) = 0#
namespace heph
namespace concurrency
template<typename T>
class AnySender#
#include “hephaestus/concurrency/any_sender.h”

Implementation for a type erased sender.

Template Parameters:

T – The value this sender completes with

Public Types

using sender_concept = stdexec::sender_t#
using completion_signatures = stdexec::completion_signatures<SelectValueCompletionT<T>, stdexec::set_error_t(std::exception_ptr), stdexec::set_stopped_t()>#

Public Functions

template<AnySenderRequirements<T> Sender, typename SenderT = std::decay_t<Sender>>
inline AnySender(Sender &&sender)#
~AnySender() = default#
AnySender(AnySender &&other) = default#
AnySender &=default operator= (AnySender &&other)
AnySender(const AnySender &other) = delete#
AnySender &=delete operator= (const AnySender &other)
template<stdexec::receiver_of<completion_signatures> Receiver>
inline auto connect(Receiver &&receiver) &&#
template<typename T>
struct SelectValueCompletion#

Public Types

using TypeT = stdexec::set_value_t(T)#
template<>
struct SelectValueCompletion<void>#

Public Types

using TypeT = stdexec::set_value_t()#

Typedefs

template<typename T>
using SelectValueCompletionT = typename SelectValueCompletion<T>::TypeT#
namespace heph
namespace concurrency
namespace spinner_state_machine#

Enums

enum class State : std::uint8_t#

Values:

enumerator NOT_INITIALIZED#
enumerator FAILED#
enumerator READY_TO_SPIN#
enumerator SPIN_SUCCESSFUL#
enumerator EXIT#
enum class Result : std::uint8_t#

Values:

enumerator PROCEED#
enumerator FAILURE#
enumerator REPEAT#

Functions

StateMachineCallbackT createStateMachineCallback(Callbacks &&callbacks)#

Create a callback for the spinner which internally handles the state machine. The current state is stored and a copy is returned to the caller.

struct Callbacks#

Public Members

OperationCallbackT init_cb = []() -> Result { return Result::PROCEED; }#

Handles initialization.

OperationCallbackT spin_once_cb = []() -> Result { return Result::PROCEED; }#

Handles execution.

CheckCallbackT shall_restart_cb = []() -> bool {return false;}#

This callback is called after failure. It decides if operation shall restart, or the spinner shall conclude. Default: do not restart.

Typedefs

using OperationCallbackT = std::function<Result()>#
using CheckCallbackT = std::function<bool()>#
using StateMachineCallbackT = std::function<State()>#
namespace heph
namespace concurrency

Functions

template<typename TagT, typename Data = stdexec::__, typename ...Children>
auto makeSenderExpression(Data &&data = {}, Children&&... children)#
struct DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_ATTRS = stdexec::__sexpr_defaults::get_attrs#
static auto GET_ENV = stdexec::__sexpr_defaults::get_env#
static auto GET_STATE = stdexec::__sexpr_defaults::get_state#
static auto CONNECT = stdexec::__sexpr_defaults::connect#
static auto START = stdexec::__sexpr_defaults::start#
static auto COMPLETE = stdexec::__sexpr_defaults::complete#
struct Ignore#

Public Functions

Ignore() noexcept = default#
template<typename ...T>
inline Ignore(T&&...) noexcept#
template<typename T>
struct Tag#

Typedefs

template<typename Function, typename ...Ts>
using CallResultT = stdexec::__call_result_t<Function, Ts...>#
template<typename SenderExpressionT>
using DataOfT = stdexec::__data_of<SenderExpressionT>#
namespace stdexec#
template<typename TagT>
struct __sexpr_impl<::heph::concurrency::Tag<TagT>>#

Public Types

using Impl = ::heph::concurrency::SenderExpressionImpl<TagT>#

Public Static Attributes

static auto get_completion_signatures = Impl::GET_COMPLETION_SIGNATURES#
static auto get_attrs = Impl::GET_ATTRS#
static auto get_env = Impl::GET_ENV#
static auto get_state = Impl::GET_STATE#
static auto connect = Impl::CONNECT#
static auto start = Impl::START#
static auto complete = Impl::COMPLETE#
namespace heph
namespace concurrency
class Context#
struct StopCallback#

Public Members

Context *self#

Public Functions

inline void operator()() const noexcept#

Public Types

using Scheduler = ContextScheduler#

Public Functions

inline explicit Context(const ContextConfig &config)#
inline Scheduler scheduler()#
void run(const std::function<void()> &on_start = [] {})#
inline void requestStop()#
inline bool isCurrent()#
inline bool stopRequested()#
inline stdexec::inplace_stop_token getStopToken()#
inline auto elapsed()#
inline io_ring::IoRing *ring()#
struct ContextConfig#

Public Members

io_ring::IoRingConfig io_ring_config#
TimerOptionsT timer_options#

Typedefs

using TimerOptionsT = io_ring::TimerOptions#
using ClockT = io_ring::TimerClock#
namespace heph
namespace concurrency
struct ContextEnv#

Public Members

Context *self#

Public Functions

inline ContextScheduler query(stdexec::get_completion_scheduler_t<stdexec::set_value_t>) const noexcept#
stdexec::inplace_stop_token query(stdexec::get_stop_token_t) const noexcept#
Context &query(GetContextT) const noexcept#

Public Static Functions

static inline auto query(stdexec::__is_scheduler_affine_t) noexcept#
struct ContextScheduleAtT#
struct ContextScheduler#

Public Members

Context *self#

Public Functions

inline Context &context() const#
template<typename TagT = ContextScheduleT>
inline auto schedule()#
template<typename Rep, typename Period, typename TagT = ContextScheduleAtT>
inline auto scheduleAfter(std::chrono::duration<Rep, Period> duration)#
template<typename Clock, typename Duration, typename TagT = ContextScheduleAtT>
inline auto scheduleAt(std::chrono::time_point<Clock, Duration> time_point)#

Public Static Functions

static inline auto query(stdexec::get_forward_progress_guarantee_t) noexcept#
struct ContextScheduleT#
struct GetContextT : public stdexec::__query<GetContextT>#

Public Functions

template<typename Env>
inline stdexec::tag_invoke_result_t<GetContextT, const Env&> operator()(const Env &env) const noexcept#
template<typename Tag = GetContextT>
auto operator()() const noexcept#

Public Static Functions

static inline bool query(stdexec::forwarding_query_t) noexcept#
template<>
struct SenderExpressionImpl<ContextScheduleAtT> : public heph::concurrency::DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_COMPLETION_SIGNATURES = [](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}#
static auto GET_ATTRS = [](auto& data) noexcept -> ContextEnv { return { std::get<0>(data) }; }#
static auto GET_STATE = []<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, data] = std::forward<Sender>(sender);auto* context = std::get<0>(data);return TimedTask<std::decay_t<Receiver>, Context>{ context, std::get<1>(data), std::move(receiver) };}#
static auto START = []<typename Receiver>(TimedTask<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }#
template<>
struct SenderExpressionImpl<ContextScheduleT> : public heph::concurrency::DefaultSenderExpressionImpl#

Public Static Attributes

static auto GET_COMPLETION_SIGNATURES = [](Ignore, Ignore = {}) noexcept {return stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_error_t(std::exception_ptr),stdexec::set_stopped_t()>{};}#
static auto GET_ATTRS = [](Context* context) noexcept -> ContextEnv { return { context }; }#
static auto GET_STATE = []<typename Sender, typename Receiver>(Sender&& sender,Receiver& receiver) {auto [_, context] = std::forward<Sender>(sender);return Task<std::decay_t<Receiver>, Context>{ context, std::move(receiver) };}#
static auto START = []<typename Receiver>(Task<std::decay_t<Receiver>, Context>& task,Receiver& ) noexcept { task.start(); }#
template<typename Receiver, typename Context>
struct Task : public heph::concurrency::TaskBase#

Public Members

Context *context = {nullptr}#
Receiver receiver#

Public Functions

inline Task(Context *context_input, Receiver &&receiver_input)#
inline void start() noexcept final#
inline void setValue() noexcept final#
inline void setStopped() noexcept final#
struct TaskBase#

Public Members

TaskDispatchOperation dispatch_operation = {this}#
TaskBase *next = {nullptr}#
TaskBase *prev = {nullptr}#

Public Functions

virtual ~TaskBase() = default#
virtual void start() noexcept = 0#
virtual void setValue() noexcept = 0#
virtual void setStopped() noexcept = 0#
struct TaskDispatchOperation : public heph::concurrency::io_ring::IoRingOperationBase#

Public Members

TaskBase *self#

Public Functions

inline explicit TaskDispatchOperation(TaskBase *task) noexcept#
void handleCompletion(::io_uring_cqe *cqe) final#
template<typename ReceiverT, typename ContextT>
struct TimedTask : public heph::concurrency::TaskBase#
struct StopCallback#

Public Members

TimedTask *self#

Public Functions

inline void operator()() const noexcept#

Public Types

using ReceiverEnvT = stdexec::env_of_t<ReceiverT>#
using StopTokenT = stdexec::stop_token_of_t<ReceiverEnvT>#
using StopCallbackT = stdexec::stop_callback_for_t<StopTokenT, StopCallback>#

Public Members

ContextT *context = {nullptr}#
io_ring::TimerClock::time_point start_time#
ReceiverT receiver#
std::optional<StopCallbackT> stop_callback#
bool timeout_started = {false}#

Public Functions

template<typename Clock, typename Duration>
inline TimedTask(Context *context_input, std::chrono::time_point<Clock, Duration> start_time_input, ReceiverT &&receiver_input)#
inline void start() noexcept final#
inline void setValue() noexcept final#
inline void setStopped() noexcept final#

Variables

static GetContextT getContext = {}#
namespace heph
namespace concurrency
namespace io_ring
class IoRing#

Public Functions

explicit IoRing(const IoRingConfig &config)#
bool stopRequested()#
void requestStop()#
stdexec::inplace_stop_token getStopToken()#
void submit(IoRingOperationBase *operation)#
void runOnce(bool block = true)#
void run(const std::function<void()> &on_start = [] {}, const std::function<bool()> &on_progress = [] { return false;})#
bool isRunning()#
bool isCurrentRing()#
struct IoRingConfig#

Public Members

std::uint32_t nentries = {DEFAULT_ENTRY_COUNT}#
std::uint32_t flags = {0}#

Public Static Attributes

static std::uint32_t DEFAULT_ENTRY_COUNT = 8#
namespace heph
namespace concurrency
class SpinnersManager#
#include “hephaestus/concurrency/spinners_manager.h”

SpinnersManager allows to orchestrate the execution of multiple spinners. The main feature is waitAny which allows to unblock as soon as one of the spinners is done. This allows to catch errors in spinner execution as soon as possible and stop the others. NOTE: right now we do not have any concept of error for the spinners: we cannot know if a spinner terminated with an error or not. If an exception is thrown inside the runner, it will be re-thrown when we call runner.stop().get(). We leave it to the user to handle it outside of the runner manager. NOTE: this logic is quite generic and can be extended to any type of object that has wait() and stop() methods. To be faitful to the principle of implement only what you need now, we limit the scope to spinners and consider to expand the scope when an use case arises.

Public Functions

explicit SpinnersManager(std::vector<Spinner*> spinners)#
void startAll()#
void waitAll()#

blocks until all spinners are finished. If a single spinner throws an exception, it will not be propagated as the other spinners are still blocking.

void waitAny()#

wait until any spinner terminates or throws an exception, which allows for immediate exception handling. Note that the exceptions will be re-thrown when calling stopAll.

void stopAll()#
namespace heph
namespace concurrency

Functions

template<std::size_t N, SenderRange Range, typename RangeT = std::decay_t<Range>>
auto whenAllRange(Range &&range)#

Wait on a range of senders with a fixed size.

This function operates on a fixed size range. The

Note

Currently only senders completing with void are supported

Template Parameters:

N – Number of elements in the range

Parameters:

range – the range of senders to wait on. Takes ownership of the range

namespace heph
namespace concurrency
template<typename T>
class MessageQueueConsumer#
#include “hephaestus/concurrency/message_queue_consumer.h”

MessageQueueConsumer creates a thread that wait for messages to be pushed in the queue and call the user provided callback on them.

Public Types

using Callback = std::function<void(T&&)>#

Public Functions

MessageQueueConsumer(Callback &&callback, std::size_t max_queue_size)#
~MessageQueueConsumer() = default#
MessageQueueConsumer(const MessageQueueConsumer&) = delete#
MessageQueueConsumer(MessageQueueConsumer&&) = delete#
MessageQueueConsumer &=delete operator= (const MessageQueueConsumer &)
MessageQueueConsumer &=delete operator= (MessageQueueConsumer &&)
void start()#
std::future<void> stop()#

Stop the message consumer, emptying the queue and stopping the processing.

Returns:

future that waits on the queue to be emptied

containers::BlockingQueue<T> &queue()#

Return the queue to allow to push messages to process. This is simpler than having to mask all the different type of push methods of the queue. The downside is that it is possible to consume messages from the outside without calling the callback.

namespace heph
namespace concurrency
template<ChannelValueType T, std::size_t Capacity>
class Channel#
#include “hephaestus/concurrency/channel.h”

Asynchronous communication channel

The member functions getValue and setValue return senders. For waiting on them, use stdexec::sync_wait.

Exception Safety: cannot throw, ensured by type constraints.

Example:

Channel<int, 10> c;

// Using Sender/Receiver
// Producer:
c.setValue(42) | stdexec::then([](){ fmt::println("Value has been successfully set!"); });

// Consumer:
c.getValue() | stdexec::then([](int res){ fmt::println("Received {}!", res); });

// Using `stdexec::sync_wait`
// Producer:
stdexec::sync_wait(c.setValue(42));

// Consumer:
auto [res] = *stdexec::sync_wait(c.getValue());
assert(res == 42);

// Using coroutines:
// Producer:
co_await c.setValue(42);

// Consumer:
auto res = co_wait c.getValue();
assert(res == 42);

Note

There is a potential starvation issue when having many producers/consumers The recommended use is in a Single Producer Single Consumer (SPSC) scenario.

Template Parameters:
  • T – The type of the values to store

  • Capacity – Maximum number of elements the Channel can store

Public Functions

template<ChannelValueType<T> U>
SetValueSender setValue(U &&value) noexcept#

Push a value into the channel. The returned sender will complete if there is space to store an element. Otherwise blocks until at least one item was consumed. Then returned sender completes with the value received from the channel.

Parameters:

value – The value to send via the channel

template<ChannelValueType<T> U>
inline void setValueOverwrite(U &&value) noexcept#

Push a value into the channel

Similar to setValue but removes the oldest element if not enough space is available.

GetValueSender getValue() noexcept#

Retrieve a value stored in the channel. The returned sender will complete as soon as there is at least one item stored in the channel.

inline std::optional<T> tryGetValue() noexcept#

Retrieve a value stored in the channel.

Similar to getValue but returns an optional which contains a value if there was an item at the time when this function was called.

namespace heph
namespace concurrency
class Spinner#
#include “hephaestus/concurrency/spinner.h”

A spinner is a class that spins in a loop calling a user-defined function. If the function is blocking, the spinner will block the thread. If the input rate_hz is set to a non-infinite value, the spinner will call the user-defined function at the given fixed rate. The spinner behavior can be configured using callbacks.

Public Types

enum class SpinResult : bool#

Values:

enumerator CONTINUE#
enumerator STOP#
using StoppableCallback = std::function<SpinResult()>#
using Callback = std::function<void()>#

Public Functions

explicit Spinner(StoppableCallback &&stoppable_callback, std::optional<std::chrono::duration<double>> spin_period = std::nullopt, std::optional<std::string> component_name = std::nullopt)#

Create a spinner with a stoppable callback. A stoppable callback is a function that returns SpinResult::STOP to indicate that the spinner should stop. Other types of callbacks are supported via mappings to StoppableCallback. Example: a callback that stops after 10 iterations.

Parameters:
  • stoppable_callback – The callback to be called in the spinner loop.

  • spin_period – The duration between spins. If not provided, the spinner will spin as fast as possible.

  • component_name – A unique name for this spinner for telemetry logging.

~Spinner()#
Spinner(const Spinner&) = delete#
Spinner &=delete operator= (const Spinner &)
Spinner(Spinner&&) = delete#
Spinner &=delete operator= (Spinner &&)
void start()#
std::future<void> stop()#
void wait()#
void setTerminationCallback(Callback &&termination_callback)#

Set a callback that will be called when the spinner is stopped. This callback could be extendend to pass the reason why the spinner was stopped, e.g. exceptions, …

Public Static Functions

static StoppableCallback createNeverStoppingCallback(Callback &&callback)#

Wrap the user provided callback in a callback that never stops.

static StoppableCallback createCallbackWithStateMachine(spinner_state_machine::StateMachineCallbackT &&state_machine_callback)#

Create a callback for the spinner which internally handles the state machine.

namespace heph
namespace concurrency
namespace io_ring
template<typename IoRingOperationT>
struct StoppableIoRingOperation : public heph::concurrency::io_ring::IoRingOperationBase#
struct StopCallback#

Public Members

StoppableIoRingOperation *self#

Public Functions

inline void operator()() const#
struct StopOperation : public heph::concurrency::io_ring::IoRingOperationBase#

Public Members

StoppableIoRingOperation *self#

Public Functions

inline void prepare(::io_uring_sqe *sqe) final#
inline void handleCompletion(::io_uring_cqe *cqe) final#

Public Members

IoRingOperationT operation#
IoRing *ring = {nullptr}#
stdexec::inplace_stop_callback<StopCallback> stop_callback#
int in_flight = {1}#
std::optional<StopOperation> stop_operation#

Public Functions

inline StoppableIoRingOperation(IoRingOperationT op, IoRing &ring, stdexec::inplace_stop_token token)#
inline void prepare(::io_uring_sqe *sqe) final#
inline void handleCompletion(::io_uring_cqe *cqe) final#
inline void requestStop()#
namespace heph
namespace concurrency
namespace io_ring

Enums

enum class ClockMode : std::uint8_t#

Values:

enumerator WALLCLOCK#
enumerator SIMULATED#
class Timer#
struct Operation#

Public Members

Timer *timer = {nullptr}#

Public Functions

void prepare(::io_uring_sqe *sqe) const#
void handleCompletion(::io_uring_cqe *cqe) const#
void handleStopped() const#
struct UpdateOperation#

Public Members

Timer *timer = {nullptr}#
__kernel_timespec next_timeout = {}#

Public Functions

void prepare(::io_uring_sqe *sqe) const#
void handleCompletion(::io_uring_cqe *cqe) const#
void handleStopped()#

Public Functions

explicit Timer(IoRing &ring, TimerOptions options)#
~Timer() noexcept#
inline bool empty() const#
void requestStop()#
void tick()#
void startAt(TaskBase *task, TimerClock::time_point start_time)#
void dequeue(TaskBase *task)#
inline TimerClock::time_point now()#
inline TimerClock::duration elapsed()#
bool tickSimulated(bool advance)#
template<typename Rep, typename Period>
inline void advanceSimulation(std::chrono::duration<Rep, Period> duration)#
inline auto clockMode()#
struct TimerClock#

Public Types

using base_clock = std::chrono::system_clock#
using duration = std::chrono::microseconds#
using rep = duration::rep#
using period = duration::period#
using time_point = std::chrono::time_point<base_clock, duration>#

Public Static Attributes

static bool is_steady = base_clock::is_steady#
static Timer *timer#

Public Static Functions

static time_point now()#
struct TimerEntry#

Public Members

TaskBase *task = {nullptr}#
TimerClock::time_point start_time#
struct TimerOptions#

Public Members

ClockMode clock_mode = {ClockMode::WALLCLOCK}#