Belle II Software  release-08-01-10
StorageZMQ2Ds.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <daq/hbasf2/modules/StorageZMQ2Ds.h>
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 
12 #include <framework/logging/Logger.h>
13 
14 using namespace std;
15 using namespace Belle2;
16 
17 REG_MODULE(StorageZMQ2Ds)
18 
20 {
21  setDescription(
22  "Input module in the ZMQ reconstruction path receiving events via ZMQ "
23  "and deserializing the to the data store. The connection to the previous ZMQ application "
24  "(most likely a distributor or collector) is handled via a load balanced connection "
25  "(input in this case). The buffer size for the load balanced connection can be "
26  "controlled via a module parameter. "
27  "This module only works in the context of the HLT when using the HLTEventProcessor, "
28  "due to the special form the first event as well as beginRun and endRun are handled. "
29  "Please read the overall description in the HLTEventProcessor for an overview. "
30  "Before the first real event is received (which is the first time the event function "
31  "is called by the HLTEventProcessor, but before the forking), the "
32  "event meta data is initialized with a predefined experiment and run number (set via "
33  "module parameters) so make module initialization in all other modules possible. "
34  "However, no event function should be called for other modules in this event "
35  "(as the data store is invalid). In the first real event after the forking, "
36  "the connection and streamer is initialized. Then, normal event messages "
37  "are deserialized and written to data store. End run or terminate messages are "
38  "handled by setting a special flag of the EventMetaData. Also in this case "
39  "the remaining modules should not process this event via an event function "
40  "(assured by the HLTEventProcessor)."
41  );
42  setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
43 
44  addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
45  addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
46  "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47  m_param_addExpressRecoObjects);
48  addParam("bufferSize", m_param_bufferSize,
49  "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
50 
51  addParam("defaultExperiment", m_lastExperiment,
52  "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
53  addParam("defaultRun", m_lastRun,
54  "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
55  addParam("inInitialize", m_inInitialize,
56  "psh added this for test", true);
57 }
58 
59 void StorageZMQ2DsModule::initialize()
60 {
61  m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
62 }
63 
64 void StorageZMQ2DsModule::event()
65 {
66  setReturnValue(0);
67 
68  // The very first event is actually not the first event for processing.
69  // It is just used to initialize the geometry, so we write out
70  // a default event and return immediately. This will cause
71  // all subsequent modules to be initialized.
72  if (m_inInitialize) {
73  m_inInitialize = false;
74 
75  m_eventMetaData.create();
76  m_eventMetaData->setExperiment(m_lastExperiment);
77  m_eventMetaData->setRun(m_lastRun);
78 
79  setReturnValue(1);
80  return;
81  }
82 
83  try {
84  // If we are not in this initialization step, we can do the normal event processing
85  // This becomes now the first "real" event
86  if (m_firstEvent) {
87  m_streamHelper.initialize();
88 
89  m_parent = std::make_unique<ZMQParent>();
90  m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
91 
92  m_firstEvent = false;
93  }
94 
95  const auto reactToInput = [this]() {
96  auto eventMessage = m_input->handleIncomingData();
97 
98  if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
99  B2DEBUG(10, "Received run change request");
100 
101  m_eventMetaData.create();
102  m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
103  return;
104  } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105  B2DEBUG(10, "Received termination request");
106 
107  m_eventMetaData.create();
108  m_eventMetaData->setEndOfData();
109  return;
110  }
111 
112  B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
113  eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
114  B2DEBUG(10, "received event message... write it to data store");
115 
116  m_streamHelper.read(std::move(eventMessage));
117 
118  B2ASSERT("There is still no event meta data present!", m_eventMetaData);
119  m_lastRun = m_eventMetaData->getRun();
120  m_lastExperiment = m_eventMetaData->getExperiment();
121  };
122 
123  bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
124  if (!result) {
125  // didn't get any events, probably interrupted by a signal.
126  // We're the input module so let's better have some event meta data
127  // even if it's not useful
128  m_eventMetaData.create();
129  m_eventMetaData->setEndOfData();
130  }
131  } catch (zmq::error_t& error) {
132  // This is an unexpected error: better report it.
133  B2ERROR("ZMQ Error while calling the event: " << error.num());
134  }
135 }
Base class for Modules.
Definition: Module.h:72
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
Definition: StorageZMQ2Ds.h:57
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.