Belle II Software  release-05-01-25
HLTZMQ2DsDirect.cc
1 #include <daq/hbasf2/modules/HLTZMQ2DsDirect.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 
4 #include <framework/logging/Logger.h>
5 
6 using namespace std;
7 using namespace Belle2;
8 
9 REG_MODULE(HLTZMQ2DsDirect)
10 
12 {
13  setDescription(
14  ""
15  );
16  setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
17 
18  addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
19  addParam("bufferSize", m_param_bufferSize,
20  "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
21 }
22 
23 void HLTZMQ2DsDirectModule::readEvent()
24 {
25  try {
26  bool tryAgain = false;
27  const auto reactToInput = [this, &tryAgain]() {
28  auto eventMessage = m_input->handleIncomingData();
29 
30  if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
31  // We do not care about those messages, so just continue
32  tryAgain = true;
33  return;
34  } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
35  B2DEBUG(10, "Received termination request");
36  tryAgain = false;
37  return;
38  }
39 
40  B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
41  eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
42  B2DEBUG(10, "received event message... write it to data store");
43 
44  m_streamHelper.read(std::move(eventMessage));
45  tryAgain = false;
46  };
47 
48  do {
49  ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
50  } while (tryAgain);
51  } catch (zmq::error_t& error) {
52  if (error.num() == EINTR) {
53  // Well, that is probably ok. It will be handled by the framework, just go out here.
54  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
55  return;
56  }
57  // This is an unexpected error: better report it.
58  B2ERROR("ZMQ Error while calling the event: " << error.num());
59  }
60 }
61 
62 void HLTZMQ2DsDirectModule::initialize()
63 {
64  m_streamHelper.initialize();
65 
66  m_parent = std::make_unique<ZMQParent>();
67  m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
68 
69  readEvent();
70 }
71 
72 void HLTZMQ2DsDirectModule::event()
73 {
74  // We do not need to process this event again
75  if (m_firstEvent) {
76  m_firstEvent = false;
77  return;
78  }
79 
80  readEvent();
81 }
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTZMQ2DsDirectModule
Special ZMQ2Ds module without the HLT-specific handling of initialization and begin/end run.
Definition: HLTZMQ2DsDirect.h:40