8 #include <daq/hbasf2/modules/HLTDs2ZMQ.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
19 "On every event, serialize the data store and send the binary data out to "
20 "the connected ZMQ application (most likely a collector or final collector). "
21 "The sending is handled via a confirmed connection (output in this case), "
22 "so all the typical behaviour applies. Also sends out end run and termination "
23 "messages. Depending on the module setting, can send out events in "
24 "raw format (with send header and trailer and a serialized event message as buffer) "
25 "or only as normal ROOT serialized stream (evt message). "
26 "The former is the typical use case when talking with e.g. storage, the "
27 "latter can be used for local tests or when sending full events e.g. to the event display. "
28 "Please note that the environment setting of the stream objects heavily "
29 "influences the time spent in this module (because serialization needs time). "
30 "This module is only useful in the HLT context or for testing it and it optimized to be used "
31 "together with the HLTEventProcessor. Please note the special handling of the first event in the "
32 "HLTEventProcessor (therefore we do not stream the first event)"
34 setPropertyFlags(EModulePropFlags::c_Output | EModulePropFlags::c_ParallelProcessingCertified);
36 addParam(
"output", m_param_output,
"The ZMQ address of the connected application (to receive the messages).");
37 addParam(
"raw", m_param_raw,
"Send out raw data with send header and trailer around the evtmessage instead of just the evtmessage. "
38 "The former is the typical use case when talking with e.g. storage, "
39 "the latter can be used for local tests or when sending full events e.g. to the event display.");
40 addParam(
"outputConfirmation", m_param_outputConfirmation,
"Waiting for output confirmation message or not. "
41 "ExpressReco output is event displays and usually don't need the confirmation message.", m_param_outputConfirmation);
44 void HLTDs2ZMQModule::event()
48 m_streamHelper.initialize();
57 auto zmqMessage = m_streamHelper.streamRaw();
58 m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
60 auto zmqMessage = m_streamHelper.stream(
false,
false);
61 m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
63 }
catch (zmq::error_t& error) {
64 if (error.num() == EINTR) {
66 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
70 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
74 void HLTDs2ZMQModule::endRun()
77 B2DEBUG(10,
"Sending out old run message");
78 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
79 m_output->handleEvent(std::move(message));
80 }
catch (zmq::error_t& error) {
81 if (error.num() == EINTR) {
83 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
87 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
91 void HLTDs2ZMQModule::terminate()
94 B2DEBUG(10,
"Sending out terminate message");
95 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
96 m_output->handleEvent(std::move(message));
97 }
catch (zmq::error_t& error) {
98 if (error.num() == EINTR) {
100 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
104 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
On every event, serialize the data store and send the binary data out to the connected ZMQ applicatio...
Output part of a confirmed connection.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.