Belle II Software  release-08-01-10
HLTDs2ZMQ.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/modules/HLTDs2ZMQ.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 
11 using namespace std;
12 using namespace Belle2;
13 
14 REG_MODULE(HLTDs2ZMQ)
15 
17 {
18  setDescription(
19  "On every event, serialize the data store and send the binary data out to "
20  "the connected ZMQ application (most likely a collector or final collector). "
21  "The sending is handled via a confirmed connection (output in this case), "
22  "so all the typical behaviour applies. Also sends out end run and termination "
23  "messages. Depending on the module setting, can send out events in "
24  "raw format (with send header and trailer and a serialized event message as buffer) "
25  "or only as normal ROOT serialized stream (evt message). "
26  "The former is the typical use case when talking with e.g. storage, the "
27  "latter can be used for local tests or when sending full events e.g. to the event display. "
28  "Please note that the environment setting of the stream objects heavily "
29  "influences the time spent in this module (because serialization needs time). "
30  "This module is only useful in the HLT context or for testing it and it optimized to be used "
31  "together with the HLTEventProcessor. Please note the special handling of the first event in the "
32  "HLTEventProcessor (therefore we do not stream the first event)"
33  );
34  setPropertyFlags(EModulePropFlags::c_Output | EModulePropFlags::c_ParallelProcessingCertified);
35 
36  addParam("output", m_param_output, "The ZMQ address of the connected application (to receive the messages).");
37  addParam("raw", m_param_raw, "Send out raw data with send header and trailer around the evtmessage instead of just the evtmessage. "
38  "The former is the typical use case when talking with e.g. storage, "
39  "the latter can be used for local tests or when sending full events e.g. to the event display.");
40  addParam("outputConfirmation", m_param_outputConfirmation, "Waiting for output confirmation message or not. "
41  "ExpressReco output is event displays and usually don't need the confirmation message.", m_param_outputConfirmation);
42 }
43 
44 void HLTDs2ZMQModule::event()
45 {
46  try {
47  if (m_firstEvent) {
48  m_streamHelper.initialize();
49  m_parent.reset(new ZMQParent);
50  m_output.reset(new ZMQConfirmedOutput(m_param_output, m_parent));
51 
52  m_firstEvent = false;
53  return;
54  }
55 
56  if (m_param_raw) {
57  auto zmqMessage = m_streamHelper.streamRaw();
58  m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
59  } else {
60  auto zmqMessage = m_streamHelper.stream(false, false);
61  m_output->handleEvent(std::move(zmqMessage), m_param_outputConfirmation);
62  }
63  } catch (zmq::error_t& error) {
64  if (error.num() == EINTR) {
65  // Well, that is probably ok. It will be handled by the framework, just go out here.
66  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
67  return;
68  }
69  // This is an unexpected error: better report it.
70  B2ERROR("ZMQ Error while calling the event: " << error.num());
71  }
72 }
73 
74 void HLTDs2ZMQModule::endRun()
75 {
76  try {
77  B2DEBUG(10, "Sending out old run message");
78  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
79  m_output->handleEvent(std::move(message));
80  } catch (zmq::error_t& error) {
81  if (error.num() == EINTR) {
82  // Well, that is probably ok. It will be handled by the framework, just go out here.
83  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
84  return;
85  }
86  // This is an unexpected error: better report it.
87  B2ERROR("ZMQ Error while calling the event: " << error.num());
88  }
89 }
90 
91 void HLTDs2ZMQModule::terminate()
92 {
93  try {
94  B2DEBUG(10, "Sending out terminate message");
95  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage);
96  m_output->handleEvent(std::move(message));
97  } catch (zmq::error_t& error) {
98  if (error.num() == EINTR) {
99  // Well, that is probably ok. It will be handled by the framework, just go out here.
100  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
101  return;
102  }
103  // This is an unexpected error: better report it.
104  B2ERROR("ZMQ Error while calling the event: " << error.num());
105  }
106 }
On every event, serialize the data store and send the binary data out to the connected ZMQ applicatio...
Definition: HLTDs2ZMQ.h:45
Base class for Modules.
Definition: Module.h:72
Output part of a confirmed connection.
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.