Belle II Software  release-08-01-10
HLTZMQ2DsDirect.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/modules/HLTZMQ2DsDirect.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 
11 #include <framework/logging/Logger.h>
12 
13 using namespace std;
14 using namespace Belle2;
15 
16 REG_MODULE(HLTZMQ2DsDirect)
17 
19 {
20  setDescription(
21  ""
22  );
23  setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
24 
25  addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
26  addParam("bufferSize", m_param_bufferSize,
27  "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
28 }
29 
30 void HLTZMQ2DsDirectModule::readEvent()
31 {
32  try {
33  bool tryAgain = false;
34  const auto reactToInput = [this, &tryAgain]() {
35  auto eventMessage = m_input->handleIncomingData();
36 
37  if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
38  // We do not care about those messages, so just continue
39  tryAgain = true;
40  return;
41  } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
42  B2DEBUG(10, "Received termination request");
43  tryAgain = false;
44  return;
45  }
46 
47  B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
48  eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
49  B2DEBUG(10, "received event message... write it to data store");
50 
51  m_streamHelper.read(std::move(eventMessage));
52  tryAgain = false;
53  };
54 
55  do {
56  ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
57  } while (tryAgain);
58  } catch (zmq::error_t& error) {
59  if (error.num() == EINTR) {
60  // Well, that is probably ok. It will be handled by the framework, just go out here.
61  B2DEBUG(10, "Received an signal interrupt during the event call. Will return");
62  return;
63  }
64  // This is an unexpected error: better report it.
65  B2ERROR("ZMQ Error while calling the event: " << error.num());
66  }
67 }
68 
69 void HLTZMQ2DsDirectModule::initialize()
70 {
71  m_streamHelper.initialize();
72 
73  m_parent = std::make_unique<ZMQParent>();
74  m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
75 
76  readEvent();
77 }
78 
79 void HLTZMQ2DsDirectModule::event()
80 {
81  // We do not need to process this event again
82  if (m_firstEvent) {
83  m_firstEvent = false;
84  return;
85  }
86 
87  readEvent();
88 }
Special ZMQ2Ds module without the HLT-specific handling of initialization and begin/end run.
Base class for Modules.
Definition: Module.h:72
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.