Belle II Software development
HLTDs2ZMQRaw.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTDs2ZMQRaw.h>
9
10using namespace std;
11using namespace Belle2;
12
13REG_MODULE(HLTDs2ZMQRaw)
14
16{
17 setDescription(
18 "On every event, serialize the data store and send the binary data out to "
19 "the connected ZMQ application (most likely a collector or final collector). "
20 "The sending is handled via a raw connection (output in this case), "
21 "so all the typical behaviour applies. Also sends out end run and termination "
22 "messages. Depending on the module setting, can send out events in "
23 "raw format (with send header and trailer and a serialized event message as buffer) "
24 "or only as normal ROOT serialized stream (evt message). "
25 "The former is the typical use case when talking with e.g. storage, the "
26 "latter can be used for local tests or when sending full events e.g. to the event display. "
27 "Please note that the environment setting of the stream objects heavily "
28 "influences the time spent in this module (because serialization needs time). "
29 "This module is only useful in the HLT context or for testing it and it optimized to be used "
30 "together with the HLTEventProcessor. Please note the special handling of the first event in the "
31 "HLTEventProcessor (therefore we do not stream the first event)"
32 );
33 setPropertyFlags(EModulePropFlags::c_Output | EModulePropFlags::c_ParallelProcessingCertified);
34
35 addParam("output", m_param_output, "The ZMQ address of the connected application (to receive the messages).");
36 addParam("addEventSize", m_param_addEventSize, "add the hlon of the event size at the beginning", true);
37 addParam("addPersistentDurability", m_param_addPersistentDurability, "For streamHelper", false);
38 addParam("streamTransientObjects", m_param_streamTransientObjects, "For streamHelper", false);
39}
40
42{
43 if (!m_firstEvent) {
45 m_parent.reset(new ZMQParent);
47 m_output->handleIncomingData();
48 }
49}
50
52{
53 try {
54 if (m_firstEvent) {
56 m_parent.reset(new ZMQParent);
58 m_output->handleIncomingData();
59
60 m_firstEvent = false;
61 return;
62 }
63
64 const auto evtMessage = m_streamHelper.stream(m_param_addPersistentDurability, m_param_streamTransientObjects);
65 auto zmqMessage = m_messageHelper.createZMQMessage(evtMessage);
66 m_output->handleEvent(std::move(zmqMessage));
67
68 } catch (zmq::error_t& error) {
69 if (error.num() == EINTR) {
70 // Well, that is probably ok. It will be handled by the framework, just go out here.
71 B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
72 return;
73 }
74 // This is an unexpected error: better report it.
75 B2ERROR("ZMQ Error while calling the event: " << error.num());
76 }
77}
78
On every event, serialize the data store and send the binary data out to the connected ZMQ applicatio...
Definition: HLTDs2ZMQRaw.h:45
bool m_firstEvent
Are we still in the first event? Please note the special handling of the first event in the HLTEventP...
Definition: HLTDs2ZMQRaw.h:73
std::string m_param_output
Module parameter: the ZMQ address of the connected application (to receive the messages)
Definition: HLTDs2ZMQRaw.h:57
ZMQMessageHelper m_messageHelper
Message helper to create zmq::message_t from EventMetaData and EvtMessage.
Definition: HLTDs2ZMQRaw.h:76
void initialize() override
Module initialization, only applied when the initial m_firstEvent is false.
Definition: HLTDs2ZMQRaw.cc:41
void event() override
On the first event, initialize the streamer and the connection. Serializes and sends out the data sto...
Definition: HLTDs2ZMQRaw.cc:51
bool m_param_addEventSize
Module parameter: add event size in the output data or not.
Definition: HLTDs2ZMQRaw.h:61
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition: HLTDs2ZMQRaw.h:66
bool m_param_addPersistentDurability
Module parameters for streamHelper.
Definition: HLTDs2ZMQRaw.h:63
std::unique_ptr< ZMQRawOutput > m_output
Raw output connection to the ZMQ application.
Definition: HLTDs2ZMQRaw.h:68
StreamHelper m_streamHelper
Original stream helper.
Definition: HLTDs2ZMQRaw.h:71
Base class for Modules.
Definition: Module.h:72
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
static zmq::message_t createZMQMessage(zmq::message_t message)
Just pass a zmq message.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:649
Abstract base class for different kinds of events.
STL namespace.