8#include <daq/hbasf2/modules/HLTDs2ZMQ.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
19 "On every event, serialize the data store and send the binary data out to "
20 "the connected ZMQ application (most likely a collector or final collector). "
21 "The sending is handled via a confirmed connection (output in this case), "
22 "so all the typical behaviour applies. Also sends out end run and termination "
23 "messages. Depending on the module setting, can send out events in "
24 "raw format (with send header and trailer and a serialized event message as buffer) "
25 "or only as normal ROOT serialized stream (evt message). "
26 "The former is the typical use case when talking with e.g. storage, the "
27 "latter can be used for local tests or when sending full events e.g. to the event display. "
28 "Please note that the environment setting of the stream objects heavily "
29 "influences the time spent in this module (because serialization needs time). "
30 "This module is only useful in the HLT context or for testing it and it optimized to be used "
31 "together with the HLTEventProcessor. Please note the special handling of the first event in the "
32 "HLTEventProcessor (therefore we do not stream the first event)"
36 addParam(
"output",
m_param_output,
"The ZMQ address of the connected application (to receive the messages).");
37 addParam(
"raw",
m_param_raw,
"Send out raw data with send header and trailer around the evtmessage instead of just the evtmessage. "
38 "The former is the typical use case when talking with e.g. storage, "
39 "the latter can be used for local tests or when sending full events e.g. to the event display.");
63 }
catch (zmq::error_t& error) {
64 if (error.num() == EINTR) {
66 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
70 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
77 B2DEBUG(10,
"Sending out old run message");
79 m_output->handleEvent(std::move(message));
80 }
catch (zmq::error_t& error) {
81 if (error.num() == EINTR) {
83 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
87 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
94 B2DEBUG(10,
"Sending out terminate message");
96 m_output->handleEvent(std::move(message));
97 }
catch (zmq::error_t& error) {
98 if (error.num() == EINTR) {
100 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
104 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
bool m_firstEvent
Are we still in the first event? Please note the special handling of the first event in the HLTEventP...
std::string m_param_output
Module parameter: the ZMQ address of the connected application (to receive the messages)
void event() override
On the first event, initialize the streamer and the connection. Serializes and sends out the data sto...
void endRun() override
Send out a run stop message.
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
void terminate() override
Send out a terminate message.
bool m_param_outputConfirmation
Module parameter: waiting for output confirmation message or not.
HLTDs2ZMQModule()
Register the module parameters.
HLTStreamHelper m_streamHelper
Utility class used for serialization.
std::unique_ptr< ZMQConfirmedOutput > m_output
Confirmed connection to the ZMQ application.
bool m_param_raw
Module paremeter: send out raw data with send header and trailer around the evtmessage instead of jus...
void initialize()
Initialize this class. Call this e.g. in the first event.
std::unique_ptr< ZMQNoIdMessage > streamRaw()
Stream the data store into an event message and add SendHeader and SendTrailer around the message....
std::unique_ptr< ZMQNoIdMessage > stream(bool addPersistentDurability, bool streamTransientObjects)
Stream the data store into an event message. Add ROI as additional message (if valid).
void setDescription(const std::string &description)
Sets the description of the module.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
@ c_Output
This module is an output module (writes data).
Output part of a confirmed connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.