Belle II Software  release-05-01-25
HLTZMQ2Ds.cc
1 #include <daq/hbasf2/modules/HLTZMQ2Ds.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 
4 #include <framework/logging/Logger.h>
5 
6 using namespace std;
7 using namespace Belle2;
8 
9 REG_MODULE(HLTZMQ2Ds)
10 
12 {
13  setDescription(
14  "Input module in the ZMQ reconstruction path receiving events via ZMQ "
15  "and deserializing the to the data store. The connection to the previous ZMQ application "
16  "(most likely a distributor or collector) is handled via a load balanced connection "
17  "(input in this case). The buffer size for the load balanced connection can be "
18  "controlled via a module parameter. "
19  "This module only works in the context of the HLT when using the HLTEventProcessor, "
20  "due to the special form the first event as well as beginRun and endRun are handled. "
21  "Please read the overall description in the HLTEventProcessor for an overview. "
22  "Before the first real event is received (which is the first time the event function "
23  "is called by the HLTEventProcessor, but before the forking), the "
24  "event meta data is initialized with a predefined experiment and run number (set via "
25  "module parameters) so make module initialization in all other modules possible. "
26  "However, no event function should be called for other modules in this event "
27  "(as the data store is invalid). In the first real event after the forking, "
28  "the connection and streamer is initialized. Then, normal event messages "
29  "are deserialized and written to data store. End run or terminate messages are "
30  "handled by setting a special flag of the EventMetaData. Also in this case "
31  "the remaining modules should not process this event via an event function "
32  "(assured by the HLTEventProcessor)."
33  );
34  setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
35 
36  addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
37  addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
38  "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
39  m_param_addExpressRecoObjects);
40  addParam("bufferSize", m_param_bufferSize,
41  "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
42 
43  addParam("defaultExperiment", m_lastExperiment,
44  "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
45  addParam("defaultRun", m_lastRun,
46  "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
47 }
48 
49 void HLTZMQ2DsModule::initialize()
50 {
51  m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
52 }
53 
54 void HLTZMQ2DsModule::event()
55 {
56  setReturnValue(0);
57 
58  // The very first event is actually not the first event for processing.
59  // It is just used to initialize the geometry, so we write out
60  // a default event and return immediately. This will cause
61  // all subsequent modules to be initialized.
62  if (m_inInitialize) {
63  m_inInitialize = false;
64 
65  m_eventMetaData.create();
66  m_eventMetaData->setExperiment(m_lastExperiment);
67  m_eventMetaData->setRun(m_lastRun);
68 
69  setReturnValue(1);
70  return;
71  }
72 
73  try {
74  // If we are not in this initialization step, we can do the normal event processing
75  // This becomes now the first "real" event
76  if (m_firstEvent) {
77  m_streamHelper.initialize();
78 
79  m_parent = std::make_unique<ZMQParent>();
80  m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
81 
82  m_firstEvent = false;
83  }
84 
85  const auto reactToInput = [this]() {
86  auto eventMessage = m_input->handleIncomingData();
87 
88  if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
89  B2DEBUG(10, "Received run change request");
90 
91  m_eventMetaData.create();
92  m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
93  return;
94  } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
95  B2DEBUG(10, "Received termination request");
96 
97  m_eventMetaData.create();
98  m_eventMetaData->setEndOfData();
99  return;
100  }
101 
102  B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
103  eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
104  B2DEBUG(10, "received event message... write it to data store");
105 
106  m_streamHelper.read(std::move(eventMessage));
107 
108  B2ASSERT("There is still no event meta data present!", m_eventMetaData);
109  m_lastRun = m_eventMetaData->getRun();
110  m_lastExperiment = m_eventMetaData->getExperiment();
111  };
112 
113  bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
114  if (!result) {
115  // didn't get any events, probably interrupted by a signal.
116  // We're the input module so let's better have some event meta data
117  // even if it's not useful
118  m_eventMetaData.create();
119  m_eventMetaData->setEndOfData();
120  }
121  } catch (zmq::error_t& error) {
122  // This is an unexpected error: better report it.
123  B2ERROR("ZMQ Error while calling the event: " << error.num());
124  }
125 }
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::HLTZMQ2DsModule
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
Definition: HLTZMQ2Ds.h:65