Belle II Software development
HLTDs2ZMQ.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTDs2ZMQ.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11using namespace std;
12using namespace Belle2;
13
14REG_MODULE(HLTDs2ZMQ);
15
17{
19 "On every event, serialize the data store and send the binary data out to "
20 "the connected ZMQ application (most likely a collector or final collector). "
21 "The sending is handled via a confirmed connection (output in this case), "
22 "so all the typical behaviour applies. Also sends out end run and termination "
23 "messages. Depending on the module setting, can send out events in "
24 "raw format (with send header and trailer and a serialized event message as buffer) "
25 "or only as normal ROOT serialized stream (evt message). "
26 "The former is the typical use case when talking with e.g. storage, the "
27 "latter can be used for local tests or when sending full events e.g. to the event display. "
28 "Please note that the environment setting of the stream objects heavily "
29 "influences the time spent in this module (because serialization needs time). "
30 "This module is only useful in the HLT context or for testing it and it optimized to be used "
31 "together with the HLTEventProcessor. Please note the special handling of the first event in the "
32 "HLTEventProcessor (therefore we do not stream the first event)"
33 );
35
36 addParam("output", m_param_output, "The ZMQ address of the connected application (to receive the messages).");
37 addParam("raw", m_param_raw, "Send out raw data with send header and trailer around the evtmessage instead of just the evtmessage. "
38 "The former is the typical use case when talking with e.g. storage, "
39 "the latter can be used for local tests or when sending full events e.g. to the event display.");
40 addParam("outputConfirmation", m_param_outputConfirmation, "Waiting for output confirmation message or not. "
41 "ExpressReco output is event displays and usually don't need the confirmation message.", m_param_outputConfirmation);
42}
43
45{
46 try {
47 if (m_firstEvent) {
49 m_parent.reset(new ZMQParent);
51
52 m_firstEvent = false;
53 return;
54 }
55
56 if (m_param_raw) {
57 auto zmqMessage = m_streamHelper.streamRaw();
58 m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
59 } else {
60 auto zmqMessage = m_streamHelper.stream(false, false);
61 m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
62 }
63 } catch (zmq::error_t& error) {
64 if (error.num() == EINTR) {
65 // Well, that is probably ok. It will be handled by the framework, just go out here.
66 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
67 return;
68 }
69 // This is an unexpected error: better report it.
70 B2ERROR("ZMQ Error while calling the event: " << error.num());
71 }
72}
73
75{
76 try {
77 B2DEBUG(10, "Sending out old run message");
78 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
79 m_output->handleEvent(std::move(message));
80 } catch (zmq::error_t& error) {
81 if (error.num() == EINTR) {
82 // Well, that is probably ok. It will be handled by the framework, just go out here.
83 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
84 return;
85 }
86 // This is an unexpected error: better report it.
87 B2ERROR("ZMQ Error while calling the event: " << error.num());
88 }
89}
90
92{
93 try {
94 B2DEBUG(10, "Sending out terminate message");
95 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
96 m_output->handleEvent(std::move(message));
97 } catch (zmq::error_t& error) {
98 if (error.num() == EINTR) {
99 // Well, that is probably ok. It will be handled by the framework, just go out here.
100 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
101 return;
102 }
103 // This is an unexpected error: better report it.
104 B2ERROR("ZMQ Error while calling the event: " << error.num());
105 }
106}
bool m_firstEvent
Are we still in the first event? Please note the special handling of the first event in the HLTEventP...
Definition: HLTDs2ZMQ.h:73
std::string m_param_output
Module parameter: the ZMQ address of the connected application (to receive the messages)
Definition: HLTDs2ZMQ.h:59
void event() override
On the first event, initialize the streamer and the connection. Serializes and sends out the data sto...
Definition: HLTDs2ZMQ.cc:44
void endRun() override
Send out a run stop message.
Definition: HLTDs2ZMQ.cc:74
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition: HLTDs2ZMQ.h:66
void terminate() override
Send out a terminate message.
Definition: HLTDs2ZMQ.cc:91
bool m_param_outputConfirmation
Module parameter: waiting for output confirmation message or not.
Definition: HLTDs2ZMQ.h:63
HLTDs2ZMQModule()
Register the module parameters.
Definition: HLTDs2ZMQ.cc:16
HLTStreamHelper m_streamHelper
Utility class used for serialization.
Definition: HLTDs2ZMQ.h:71
std::unique_ptr< ZMQConfirmedOutput > m_output
Confirmed connection to the ZMQ application.
Definition: HLTDs2ZMQ.h:68
bool m_param_raw
Module paremeter: send out raw data with send header and trailer around the evtmessage instead of jus...
Definition: HLTDs2ZMQ.h:61
void initialize()
Initialize this class. Call this e.g. in the first event.
std::unique_ptr< ZMQNoIdMessage > streamRaw()
Stream the data store into an event message and add SendHeader and SendTrailer around the message....
std::unique_ptr< ZMQNoIdMessage > stream(bool addPersistentDurability, bool streamTransientObjects)
Stream the data store into an event message. Add ROI as additional message (if valid).
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79
Output part of a confirmed connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.