ipc#
-
namespace heph
-
namespace ipc#
-
namespace zenoh#
Functions
-
void appendProgramOption(cli::ProgramDescription &program_description, const std::string &default_topic = DEFAULT_TOPIC)#
-
std::tuple<Config, TopicConfig, TopicFilterParams> parseProgramOptions(const heph::cli::ProgramOptions &args)#
Variables
-
static auto DEFAULT_TOPIC = ""#
-
void appendProgramOption(cli::ProgramDescription &program_description, const std::string &default_topic = DEFAULT_TOPIC)#
-
namespace zenoh#
-
namespace ipc#
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
struct MessageMetadata#
-
class RawSubscriber#
Public Types
-
using DataCallback = std::function<void(const MessageMetadata&, std::span<const std::byte>)>#
Public Functions
-
RawSubscriber(SessionPtr session, TopicConfig topic_config, DataCallback &&callback, serdes::TypeInfo type_info, const SubscriberConfig &config = {})#
Note: setting dedicated_callback_thread to true will consume the messages in a dedicated thread. While this avoid blocking the Zenoh session thread to process other messages, it also introduce an overhead due to the message data being copied.
-
~RawSubscriber()#
-
RawSubscriber(const RawSubscriber&) = delete#
-
RawSubscriber(RawSubscriber&&) = delete#
- RawSubscriber &=delete operator= (const RawSubscriber &)
- RawSubscriber &=delete operator= (RawSubscriber &&)
-
using DataCallback = std::function<void(const MessageMetadata&, std::span<const std::byte>)>#
-
struct SubscriberConfig#
-
struct MessageMetadata#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
Functions
-
template<typename T>
std::unique_ptr<Subscriber<T>> createSubscriber(zenoh::SessionPtr session, TopicConfig topic_config, typename Subscriber<T>::DataCallback &&callback, const SubscriberConfig &config = {})# Create a subscriber for a specific topic.
- Template Parameters:
T – The type of the message to subscribe to.
- Parameters:
session – The Zenoh session to use.
topic_config – The topic to subscribe to.
callback – The callback to call when a message is received.
dedicated_callback_thread – Whether to use a dedicated thread for the callback. If set to false the callback will be invoked on the Zenoh callback thread.
- Returns:
A new subscriber.
-
template<typename T>
class Subscriber : public heph::ipc::zenoh::SubscriberBase# Public Types
-
using DataCallback = std::function<void(const MessageMetadata&, const std::shared_ptr<T>&)>#
Public Functions
-
inline Subscriber(zenoh::SessionPtr session, TopicConfig topic_config, DataCallback &&callback, const SubscriberConfig &config = {})#
-
using DataCallback = std::function<void(const MessageMetadata&, const std::shared_ptr<T>&)>#
-
template<typename T>
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
namespace action_server#
-
template<typename RequestT, typename StatusT, typename ReplyT>
class PollableActionServer# - #include “hephaestus/ipc/zenoh/action_server/pollable_action_server.h”
A wrapper around ActionServer which facilitates the implementation of action servers using the polling paradigm.
The code implementing the action server is expected to call ‘pollRequest’ at a sufficiently high frequency to check for new requests. When a new request is returned, then it’s expected that the calling code takes care of executing the action and calls ‘complete’ when the action is completed.
The action server will not accept new requests if an action is still in progress.
Public Functions
-
PollableActionServer(SessionPtr session, TopicConfig topic_config)#
Constructs a new action server.
-
std::optional<RequestT> pollRequest()#
If a new request is pending, returns the pending request and switches the PollingActionServer to the IN_PROGRESS state. The code implementing the action server is expecting to execute the action and call ‘complete’ when the action is completed.
In all other cases, the return value is std::nullopt.
Note: It is allowed to call this function when a action is in progress, though the return value will be std::nullopt, not the request which started the action.
-
void complete(ReplyT reply)#
Completes the currently running action with the given reply.
This function should only be called when an action is currently in progress (which includes the case when it’s being aborted).
-
void setStatus(StatusT status)#
Sets the action server status.
This function should only be called when an action is currently in progress (which includes the case when it’s being aborted).
-
bool shouldAbort()#
Returns true if the current action should be aborted.
It’s expected (though not mandatory) that the code implementing the action server aborts its current action as fast as possible and then calls ‘complete’.
-
void stop()#
Stops the underlying action server.
If an action is currently in progress, then it will wait for this action to complete.
This function should generally be called from a thread different from the one which executes the action server’s actions, because it will block if an action is currently in progress, so the thread which executes the action must keep making progress in the meanwhile.
-
PollableActionServer(SessionPtr session, TopicConfig topic_config)#
-
template<typename RequestT, typename StatusT, typename ReplyT>
-
namespace action_server#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
Enums
Functions
-
std::string generateLivelinessTokenKeyexpr(std::string_view topic, const ::zenoh::Id &session_id, EndpointType actor_type)#
-
std::optional<EndpointInfo> parseLivelinessToken(std::string_view keyexpr, ::zenoh::SampleKind kind)#
-
std::vector<EndpointInfo> getListOfEndpoints(const Session &session, const TopicFilter &topic_filter)#
-
void printEndpointInfo(const EndpointInfo &info)#
-
class EndpointDiscovery#
- #include “hephaestus/ipc/zenoh/liveliness.h”
Class to detect all the publisher present in the network. The publisher need to advertise their presence with the liveliness token.
Public Types
-
using Callback = std::function<void(const EndpointInfo &info)>#
Public Functions
-
explicit EndpointDiscovery(SessionPtr session, TopicFilter topic_filter, Callback &&callback)#
-
~EndpointDiscovery()#
-
EndpointDiscovery(const EndpointDiscovery&) = delete#
-
EndpointDiscovery(EndpointDiscovery&&) = delete#
- EndpointDiscovery &=delete operator= (const EndpointDiscovery &)
- EndpointDiscovery &=delete operator= (EndpointDiscovery &&)
-
using Callback = std::function<void(const EndpointInfo &info)>#
-
struct EndpointInfo#
-
Public Functions
- bool=default operator== (const EndpointInfo &) const
-
std::string generateLivelinessTokenKeyexpr(std::string_view topic, const ::zenoh::Id &session_id, EndpointType actor_type)#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
namespace action_server
Enums
-
template<typename RequestT>
struct Request#
-
struct RequestResponse#
Public Members
-
RequestStatus status#
-
RequestStatus status#
-
template<typename ReplyT>
struct Response#
-
template<typename RequestT>
-
namespace action_server
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
Functions
-
bool isValidIdChar(char c)#
Variables
-
static auto TEXT_PLAIN_ENCODING = "text/plain"#
-
static auto PUBLISHER_ATTACHMENT_MESSAGE_COUNTER_KEY = "0"#
We use single char key to reduce the overhead of the attachment.
-
static auto PUBLISHER_ATTACHMENT_MESSAGE_SESSION_ID_KEY = "1"#
-
static auto PUBLISHER_ATTACHMENT_MESSAGE_TYPE_INFO = "2"#
-
bool isValidIdChar(char c)#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
template<typename RequestT, typename ReplyT>
class ServiceClient# Public Functions
-
ServiceClient(SessionPtr session, TopicConfig topic_config, std::chrono::milliseconds timeout)#
-
std::vector<ServiceResponse<ReplyT>> call(const RequestT &request)#
-
ServiceClient(SessionPtr session, TopicConfig topic_config, std::chrono::milliseconds timeout)#
-
template<typename RequestT, typename ReplyT>
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
class TopicFilter#
Public Functions
-
TopicFilter onlyIncluding(const std::vector<std::string> &topic_names) &&#
If specified this is the only one allowed, all the filter specified before will be removed and no other filter can be specified after.
-
TopicFilter prefix(std::string prefix) &&#
-
TopicFilter excludePrefix(std::string prefix) &&#
-
TopicFilter anyExcluding(const std::vector<std::string> &topic_names) &&#
Public Static Functions
-
static TopicFilter create()#
-
static TopicFilter create(const TopicFilterParams ¶ms)#
-
TopicFilter onlyIncluding(const std::vector<std::string> &topic_names) &&#
-
struct TopicFilterParams#
-
class TopicFilter#
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
namespace action_server
-
Functions
-
template<typename RequestT, typename StatusT, typename ReplyT>
std::future<Response<ReplyT>> callActionServer(SessionPtr session, const TopicConfig &topic_config, const RequestT &request, StatusUpdateCallback<StatusT> &&status_update_cb, std::chrono::milliseconds request_timeout)# Call the action server with the given request.
The status_update_cb will be called with the status updates from the server.
The updates are decided by the user implementation of the action server.
Returns a future which will eventually contain the response from the server.
-
bool requestActionServerToStopExecution(Session &session, const TopicConfig &topic_config)#
Request the action server to stop.
-
template<typename RequestT, typename StatusT, typename ReplyT>
class ActionServer# - #include “hephaestus/ipc/zenoh/action_server/action_server.h”
An action server is a server that execute a user function in response to trigger from a client. Upon completion a result is sent back to the client. Differently from classic request/response servers, action servers are asynchronous and non-blocking. Action servers also provide functionalities for the user to send status updates to the client during execution of the action and are interruptible. To instantiate an ActionServer the user needs to provide two callbacks:
TriggerCallback
Takes as input the request and need to decide if the request is valid and if it can be served.
No long running operations should be done in this callback.
ExecuteCallback
This is the function that does the actual work and eventually returns the final response to the client.
The execute callback is run in a dedicated thread.
The function begins execution as soon as the request is accepted.
The function has access to a Publisher to send status updates to the client.
The frequency of the updates is decided by the user; updates are not mandatory
The function can be interrupted by the client by setting the stop_requested flag to true.
The function should check this flag periodically and return if it is set.
The ability to stop the server relies on the user correctly reading the value of
stop_requested
. If the user ignore the variable, the server cannot be stopped. NOTES:
If an action server is already serving a request it will not accept new ones.
#
- RequestT needs to be copyable and one copy will be made of it.#
Implementation details:
ActionServer contains a
Service
to receive the requests and aMessageQueueConsumer
to execute them.When a new request is accepted and started, a new
Publisher
is created to send status updates.The status publisher is passed to
execute_cb
, which the user can use to publish status update of the execution.When calling the server, the client will create a temporary
Subscriber
to receive the updates.
When
execute_cb
finishes the final response is sent to the client via aService
created by the caller. NOTE: The reason why we can only process one request at a time is that with the current implementation the responseService
and the updatePublisher
uses a topic name which just depends on the input topic name. This means that if we have multiple requests we will have multiplePublisher
andSubscriber
with the same topic name, which will cause conflicts. This could be solved by adding a unique identifier to the topic name, but this is not implemented yet.
Public Types
-
using TriggerCallback = std::function<TriggerStatus(const RequestT&)>#
Public Functions
-
ActionServer(SessionPtr session, TopicConfig topic_config, TriggerCallback &&action_trigger_cb, ExecuteCallback &&execute_cb)#
-
~ActionServer()#
-
ActionServer(const ActionServer&) = delete#
-
ActionServer(ActionServer&&) = delete#
- ActionServer &=delete operator= (const ActionServer &)
- ActionServer &=delete operator= (ActionServer &&)
-
const TopicConfig &getTopicConfig() const#
-
template<typename RequestT, typename StatusT, typename ReplyT>
-
namespace action_server
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
namespace action_server
-
namespace action_server
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
template<class T>
class Publisher : public heph::ipc::zenoh::PublisherBase# Public Functions
-
inline Publisher(SessionPtr session, TopicConfig topic_config, RawPublisher::MatchCallback &&match_cb = nullptr, const PublisherConfig &config = {})#
-
inline Publisher(SessionPtr session, TopicConfig topic_config, RawPublisher::MatchCallback &&match_cb = nullptr, const PublisherConfig &config = {})#
-
template<class T>
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
Enums
Functions
-
void setSessionId(ZenohConfig &config, std::string_view id)#
-
void setSessionIdFromBinary(ZenohConfig &config)#
-
void setMode(ZenohConfig &config, Mode mode)#
-
void connectToEndpoints(ZenohConfig &config, const std::vector<std::string> &endpoints)#
-
void listenToEndpoints(ZenohConfig &config, const std::vector<std::string> &endpoints)#
-
void setQos(ZenohConfig &config, bool)#
-
void setRealTime(ZenohConfig &config, bool)#
-
void setMulticastScouting(ZenohConfig &config, bool)#
-
void setMulticastScoutingInterface(ZenohConfig &config, std::string_view interface)#
-
Config createLocalConfig()#
Create configuration for a session that doesn’t connect to any other session. This is useful for testing and for local communications
-
SessionPtr createSession(const Config &config)#
-
SessionPtr createSession(ZenohConfig config)#
-
struct Config#
Public Members
-
bool use_binary_name_as_session_id = false#
-
Mode mode = Mode::PEER#
NOTE: With shared-memory enabled, the publisher still uses the network transport layer to notify subscribers of the shared-memory segment to read. Therefore, for very small messages, shared - memory transport could be less efficient than using the default network transport to directly carry the payload.
-
bool qos = false#
If specified connect to the given router endpoint.
-
bool real_time = false#
-
bool multicast_scouting_enabled = true#
-
bool use_binary_name_as_session_id = false#
-
struct ZenohConfig#
- #include “hephaestus/ipc/zenoh/session.h”
There are a lot of options to configure a zenoh session, See for more information https://zenoh.io/docs/manual/configuration/#configuration-files
Public Members
-
::zenoh::Config zconfig = {::zenoh::Config::create_default()}#
Public Functions
-
ZenohConfig()#
-
~ZenohConfig() noexcept = default#
-
ZenohConfig(const ZenohConfig&) = delete#
- ZenohConfig &=delete operator= (const ZenohConfig &)
-
ZenohConfig(ZenohConfig&&) noexcept = default#
- ZenohConfig &=default operator= (ZenohConfig &&) noexcept
-
::zenoh::Config zconfig = {::zenoh::Config::create_default()}#
-
void setSessionId(ZenohConfig &config, std::string_view id)#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
class DynamicSubscriber#
- #include “hephaestus/ipc/zenoh/dynamic_subscriber.h”
This class actively listen for new publisher and for each new topic that passes the filter it creates a new subscriber. The user can provide a callback that is called once when a new publisher is discovered and a callback to be passed to the topic subscriber.
-
struct DynamicSubscriberParams#
Public Members
-
SessionPtr session#
-
TopicFilterParams topics_filter_params#
-
TopicWithTypeInfoCallback init_subscriber_cb#
-
SubscriberWithTypeCallback subscriber_cb#
This callback is called before creating a new subscriber.
-
SessionPtr session#
-
class DynamicSubscriber#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
struct TopicConfig#
-
struct TopicConfig#
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
class IpcGraph#
Public Functions
-
IpcGraph(IpcGraphConfig config, IpcGraphCallbacks &&callbacks)#
-
void start()#
-
void stop()#
-
std::optional<serdes::ServiceTypeInfo> getServiceTypeInfo(const std::string &service_name) const#
-
TopicsToTypeMap getTopicsToTypeMap() const#
-
TopicsToServiceTypesMap getServicesToTypesMap() const#
-
TopicToSessionIdMap getServicesToServersMap() const#
-
TopicToSessionIdMap getServicesToClientsMap() const#
-
TopicToSessionIdMap getTopicToSubscribersMap() const#
-
TopicToSessionIdMap getTopicToPublishersMap() const#
-
void refreshConnectionGraph() const#
-
void endPointInfoUpdateCallback(const ipc::zenoh::EndpointInfo &info)#
-
IpcGraph(IpcGraphConfig config, IpcGraphCallbacks &&callbacks)#
-
struct IpcGraphCallbacks#
Public Types
-
using ServiceDiscoveryCallback = std::function<void(const std::string&, const serdes::ServiceTypeInfo&)>#
-
using GraphUpdateCallback = std::function<void(const ipc::zenoh::EndpointInfo&, const IpcGraphState&)>#
Public Members
-
TopicDiscoveryCallback topic_discovery_cb#
-
TopicRemovalCallback topic_removal_cb#
-
ServiceDiscoveryCallback service_discovery_cb#
-
ServiceRemovalCallback service_removal_cb#
-
GraphUpdateCallback graph_update_cb#
-
using ServiceDiscoveryCallback = std::function<void(const std::string&, const serdes::ServiceTypeInfo&)>#
-
struct IpcGraphConfig#
-
struct IpcGraphState#
Public Members
-
TopicsToTypeMap topics_to_types_map#
-
TopicToSessionIdMap topic_to_publishers_map#
-
TopicToSessionIdMap topic_to_subscribers_map#
-
TopicsToServiceTypesMap services_to_types_map#
-
TopicToSessionIdMap services_to_server_map#
-
TopicToSessionIdMap services_to_client_map#
-
TopicsToTypeMap topics_to_types_map#
-
class IpcGraph#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
Functions
-
template<typename RequestT, typename ReplyT>
std::vector<ServiceResponse<ReplyT>> callService(Session &session, const TopicConfig &topic_config, const RequestT &request, std::chrono::milliseconds timeout)#
-
std::vector<ServiceResponse<std::vector<std::byte>>> callServiceRaw(Session &session, const TopicConfig &topic_config, std::span<const std::byte> buffer, std::chrono::milliseconds timeout)#
-
bool isEndpointTypeInfoServiceTopic(const std::string &topic)#
Return true if the input topic correspond to the service type info topic.
-
template<typename RequestT, typename ReplyT>
class Service : public heph::ipc::zenoh::ServiceBase# Public Types
Public Functions
-
Service(SessionPtr session, TopicConfig topic_config, Callback &&callback, FailureCallback &&failure_callback =
[]() {}
, PostReplyCallback &&post_reply_callback =[]() {}
, const ServiceConfig &config = {})# Create a new service that listen for request on
topic_config
.The
callback
will be called with the request and should return the reply.The
failure_callback
will be called if the service fails to process the request.The
post_reply_callback
will be called after the reply has been sent.This can be used to perform cleanup operations.
-
Service(SessionPtr session, TopicConfig topic_config, Callback &&callback, FailureCallback &&failure_callback =
-
struct ServiceConfig#
-
template<typename ReplyT>
struct ServiceResponse#
-
template<typename RequestT, typename ReplyT>
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
-
struct NodeInfo#
-
struct NodeInfo#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
Functions
-
namespace zenoh
-
class ITopicDatabase#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
-
struct PublisherConfig#
-
class RawPublisher#
- #include “hephaestus/ipc/zenoh/raw_publisher.h”
Create a Zenoh publisher on the topic specified in
config
.Create a service that provides the schema used to serialize the data.
the service is published on the topic created via
getTypeInfoServiceTopic(topic)
e.g. for topic
hephaestus/pose
it create a service ontype_info/hephaestus/pose
the service returns the Json representation of the type info, that can be converted using serdes::TypeInfo::fromJson(str);
If
match_cb
is passed, it is called when the first subscriber matches and when the last one unmatch.
Public Types
-
using MatchCallback = std::function<void(MatchingStatus)>#
Public Functions
-
RawPublisher(SessionPtr session, TopicConfig topic_config, serdes::TypeInfo type_info, MatchCallback &&match_cb = nullptr, const PublisherConfig &config = {})#
-
~RawPublisher() = default#
-
RawPublisher(const RawPublisher&) = delete#
-
RawPublisher(RawPublisher&&) = delete#
- RawPublisher &=delete operator= (const RawPublisher &)
- RawPublisher &=delete operator= (RawPublisher &&)
-
struct PublisherConfig#
-
namespace zenoh
-
namespace ipc
-
namespace heph
-
namespace ipc
-
namespace zenoh
-
namespace action_server
Functions
-
template<typename RequestT>
void toProto(proto::Request &proto_request, const Request<RequestT> &request)#
-
template<typename RequestT>
void fromProto(const proto::Request &proto_request, Request<RequestT> &request)#
-
void toProto(proto::RequestResponse &proto_response, const RequestResponse &response)#
-
void fromProto(const proto::RequestResponse &proto_response, RequestResponse &response)#
-
template<typename RequestT>
-
namespace action_server
-
namespace zenoh
-
namespace serdes#
-
namespace protobuf#
-
template<typename RequestT>
struct ProtoAssociation<heph::ipc::zenoh::action_server::Request<RequestT>># Public Types
-
using Type = heph::ipc::zenoh::action_server::proto::Request#
-
using Type = heph::ipc::zenoh::action_server::proto::Request#
-
template<>
struct ProtoAssociation<heph::ipc::zenoh::action_server::RequestResponse># Public Types
-
using Type = heph::ipc::zenoh::action_server::proto::RequestResponse#
-
using Type = heph::ipc::zenoh::action_server::proto::RequestResponse#
-
template<typename RequestT>
-
namespace protobuf#
-
namespace ipc