Belle II Software  release-05-01-25
HLTDs2ZMQ.cc
1 #include <daq/hbasf2/modules/HLTDs2ZMQ.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 
4 using namespace std;
5 using namespace Belle2;
6 
7 REG_MODULE(HLTDs2ZMQ)
8 
10 {
11  setDescription(
12  "On every event, serialize the data store and send the binary data out to "
13  "the connected ZMQ application (most likely a collector or final collector). "
14  "The sending is handled via a confirmed connection (output in this case), "
15  "so all the typical behaviour applies. Also sends out end run and termination "
16  "messages. Depending on the module setting, can send out events in "
17  "raw format (with send header and trailer and a serialized event message as buffer) "
18  "or only as normal ROOT serialized stream (evt message). "
19  "The former is the typical use case when talking with e.g. storage, the "
20  "latter can be used for local tests or when sending full events e.g. to the event display. "
21  "Please note that the environment setting of the stream objects heavily "
22  "influences the time spent in this module (because serialization needs time). "
23  "This module is only useful in the HLT context or for testing it and it optimized to be used "
24  "together with the HLTEventProcessor. Please note the special handling of the first event in the "
25  "HLTEventProcessor (therefore we do not stream the first event)"
26  );
27  setPropertyFlags(EModulePropFlags::c_Output | EModulePropFlags::c_ParallelProcessingCertified);
28 
29  addParam("output", m_param_output, "The ZMQ address of the connected application (to receive the messages).");
30  addParam("raw", m_param_raw, "Send out raw data with send header and trailer around the evtmessage instead of just the evtmessage. "
31  "The former is the typical use case when talking with e.g. storage, "
32  "the latter can be used for local tests or when sending full events e.g. to the event display.");
33 }
34 
35 void HLTDs2ZMQModule::event()
36 {
37  try {
38  if (m_firstEvent) {
39  m_streamHelper.initialize();
40  m_parent.reset(new ZMQParent);
41  m_output.reset(new ZMQConfirmedOutput(m_param_output, m_parent));
42 
43  m_firstEvent = false;
44  return;
45  }
46 
47  if (m_param_raw) {
48  auto zmqMessage = m_streamHelper.streamRaw();
49  m_output->handleEvent(std::move(zmqMessage));
50  } else {
51  auto zmqMessage = m_streamHelper.stream(false, false);
52  m_output->handleEvent(std::move(zmqMessage));
53  }
54  } catch (zmq::error_t& error) {
55  if (error.num() == EINTR) {
56  // Well, that is probably ok. It will be handled by the framework, just go out here.
57  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
58  return;
59  }
60  // This is an unexpected error: better report it.
61  B2ERROR("ZMQ Error while calling the event: " << error.num());
62  }
63 }
64 
65 void HLTDs2ZMQModule::endRun()
66 {
67  try {
68  B2DEBUG(10, "Sending out old run message");
69  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
70  m_output->handleEvent(std::move(message));
71  } catch (zmq::error_t& error) {
72  if (error.num() == EINTR) {
73  // Well, that is probably ok. It will be handled by the framework, just go out here.
74  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
75  return;
76  }
77  // This is an unexpected error: better report it.
78  B2ERROR("ZMQ Error while calling the event: " << error.num());
79  }
80 }
81 
82 void HLTDs2ZMQModule::terminate()
83 {
84  try {
85  B2DEBUG(10, "Sending out terminate message");
86  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
87  m_output->handleEvent(std::move(message));
88  } catch (zmq::error_t& error) {
89  if (error.num() == EINTR) {
90  // Well, that is probably ok. It will be handled by the framework, just go out here.
91  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
92  return;
93  }
94  // This is an unexpected error: better report it.
95  B2ERROR("ZMQ Error while calling the event: " << error.num());
96  }
97 }
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
Belle2::HLTDs2ZMQModule
On every event, serialize the data store and send the binary data out to the connected ZMQ applicatio...
Definition: HLTDs2ZMQ.h:55
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQParent
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:49