Belle II Software  release-08-01-10
HLTZMQ2Ds.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/modules/HLTZMQ2Ds.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 
11 #include <framework/logging/Logger.h>
12 
13 using namespace std;
14 using namespace Belle2;
15 
16 REG_MODULE(HLTZMQ2Ds)
17 
19 {
20  setDescription(
21  "Input module in the ZMQ reconstruction path receiving events via ZMQ "
22  "and deserializing the to the data store. The connection to the previous ZMQ application "
23  "(most likely a distributor or collector) is handled via a load balanced connection "
24  "(input in this case). The buffer size for the load balanced connection can be "
25  "controlled via a module parameter. "
26  "This module only works in the context of the HLT when using the HLTEventProcessor, "
27  "due to the special form the first event as well as beginRun and endRun are handled. "
28  "Please read the overall description in the HLTEventProcessor for an overview. "
29  "Before the first real event is received (which is the first time the event function "
30  "is called by the HLTEventProcessor, but before the forking), the "
31  "event meta data is initialized with a predefined experiment and run number (set via "
32  "module parameters) so make module initialization in all other modules possible. "
33  "However, no event function should be called for other modules in this event "
34  "(as the data store is invalid). In the first real event after the forking, "
35  "the connection and streamer is initialized. Then, normal event messages "
36  "are deserialized and written to data store. End run or terminate messages are "
37  "handled by setting a special flag of the EventMetaData. Also in this case "
38  "the remaining modules should not process this event via an event function "
39  "(assured by the HLTEventProcessor)."
40  );
41  setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
42 
43  addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
44  addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
45  "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
46  m_param_addExpressRecoObjects);
47  addParam("bufferSize", m_param_bufferSize,
48  "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
49 
50  addParam("defaultExperiment", m_lastExperiment,
51  "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
52  addParam("defaultRun", m_lastRun,
53  "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
54 }
55 
56 void HLTZMQ2DsModule::initialize()
57 {
58  m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
59 }
60 
61 void HLTZMQ2DsModule::event()
62 {
63  setReturnValue(0);
64 
65  // The very first event is actually not the first event for processing.
66  // It is just used to initialize the geometry, so we write out
67  // a default event and return immediately. This will cause
68  // all subsequent modules to be initialized.
69  if (m_inInitialize) {
70  m_inInitialize = false;
71 
72  m_eventMetaData.create();
73  m_eventMetaData->setExperiment(m_lastExperiment);
74  m_eventMetaData->setRun(m_lastRun);
75 
76  setReturnValue(1);
77  return;
78  }
79 
80  try {
81  // If we are not in this initialization step, we can do the normal event processing
82  // This becomes now the first "real" event
83  if (m_firstEvent) {
84  m_streamHelper.initialize();
85 
86  m_parent = std::make_unique<ZMQParent>();
87  m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
88 
89  m_firstEvent = false;
90  }
91 
92  const auto reactToInput = [this]() {
93  auto eventMessage = m_input->handleIncomingData();
94 
95  if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
96  B2DEBUG(10, "Received run change request");
97 
98  m_eventMetaData.create();
99  if (m_firstEventIsSpecialMessage) {
100  m_eventMetaData->setExperiment(m_lastExperiment);
101  m_eventMetaData->setRun(m_lastRun);
102  } else {
103  m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
104  }
105  return;
106  } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
107  B2DEBUG(10, "Received termination request");
108 
109  m_eventMetaData.create();
110  m_eventMetaData->setEndOfData();
111  return;
112  }
113 
114  B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
115  eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
116  B2DEBUG(10, "received event message... write it to data store");
117 
118  m_streamHelper.read(std::move(eventMessage));
119 
120  B2ASSERT("There is still no event meta data present!", m_eventMetaData);
121  m_lastRun = m_eventMetaData->getRun();
122  m_lastExperiment = m_eventMetaData->getExperiment();
123  m_firstEventIsSpecialMessage = false;
124  };
125 
126  bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
127  if (!result) {
128  // didn't get any events, probably interrupted by a signal.
129  // We're the input module so let's better have some event meta data
130  // even if it's not useful
131  m_eventMetaData.create();
132  m_eventMetaData->setEndOfData();
133  }
134  } catch (zmq::error_t& error) {
135  // This is an unexpected error: better report it.
136  B2ERROR("ZMQ Error while calling the event: " << error.num());
137  }
138 }
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
Definition: HLTZMQ2Ds.h:55
Base class for Modules.
Definition: Module.h:72
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.