Belle II Software development
HLTZMQ2Ds.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTZMQ2Ds.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11#include <framework/logging/Logger.h>
12
13using namespace std;
14using namespace Belle2;
15
16REG_MODULE(HLTZMQ2Ds);
17
19{
21 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
22 "and deserializing the to the data store. The connection to the previous ZMQ application "
23 "(most likely a distributor or collector) is handled via a load balanced connection "
24 "(input in this case). The buffer size for the load balanced connection can be "
25 "controlled via a module parameter. "
26 "This module only works in the context of the HLT when using the HLTEventProcessor, "
27 "due to the special form the first event as well as beginRun and endRun are handled. "
28 "Please read the overall description in the HLTEventProcessor for an overview. "
29 "Before the first real event is received (which is the first time the event function "
30 "is called by the HLTEventProcessor, but before the forking), the "
31 "event meta data is initialized with a predefined experiment and run number (set via "
32 "module parameters) so make module initialization in all other modules possible. "
33 "However, no event function should be called for other modules in this event "
34 "(as the data store is invalid). In the first real event after the forking, "
35 "the connection and streamer is initialized. Then, normal event messages "
36 "are deserialized and written to data store. End run or terminate messages are "
37 "handled by setting a special flag of the EventMetaData. Also in this case "
38 "the remaining modules should not process this event via an event function "
39 "(assured by the HLTEventProcessor)."
40 );
42
43 addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
44 addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
45 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47 addParam("bufferSize", m_param_bufferSize,
48 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
49
50 addParam("defaultExperiment", m_lastExperiment,
51 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
52 addParam("defaultRun", m_lastRun,
53 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
54}
55
60
62{
64
65 // The very first event is actually not the first event for processing.
66 // It is just used to initialize the geometry, so we write out
67 // a default event and return immediately. This will cause
68 // all subsequent modules to be initialized.
69 if (m_inInitialize) {
70 m_inInitialize = false;
71
72 m_eventMetaData.create();
73 m_eventMetaData->setExperiment(m_lastExperiment);
75
77 return;
78 }
79
80 try {
81 // If we are not in this initialization step, we can do the normal event processing
82 // This becomes now the first "real" event
83 if (m_firstEvent) {
84 m_streamHelper.initialize();
85
86 m_parent = std::make_unique<ZMQParent>();
87 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
88
89 m_firstEvent = false;
90 }
91
92 const auto reactToInput = [this]() {
93 auto eventMessage = m_input->handleIncomingData();
94
95 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
96 B2DEBUG(10, "Received run change request");
97
98 m_eventMetaData.create();
102 }
103 return;
104 } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105 B2DEBUG(10, "Received termination request");
106
107 m_eventMetaData.create();
108 m_eventMetaData->setEndOfData();
109 return;
110 }
111
112 B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
113 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
114 B2DEBUG(10, "received event message... write it to data store");
115
116 m_streamHelper.read(std::move(eventMessage));
117
118 B2ASSERT("There is still no event meta data present!", m_eventMetaData);
119 m_lastRun = m_eventMetaData->getRun();
120 m_lastExperiment = m_eventMetaData->getExperiment();
122 };
123
124 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
125 if (!result) {
126 // didn't get any events, probably interrupted by a signal.
127 // We're the input module so let's better have some event meta data
128 // even if it's not useful
129 // If the m_lastRun is 0, it is probably the lastEventMessage. Do not issue the endOfData.
130 if (m_lastRun != 0) {
131 m_eventMetaData.create();
132 m_eventMetaData->setEndOfData();
133 } else {
135 }
136 }
137 } catch (zmq::error_t& error) {
138 // This is an unexpected error: better report it.
139 B2ERROR("ZMQ Error while calling the event: " << error.num());
140 }
141}
bool m_firstEvent
Are we still in the first real event?
Definition HLTZMQ2Ds.h:76
std::unique_ptr< ZMQLoadBalancedInput > m_input
Load balanced connection to the previous ZMQ application.
Definition HLTZMQ2Ds.h:70
bool m_lastEventIsSpecialMessage
The last event has a special message type?
Definition HLTZMQ2Ds.h:78
bool m_inInitialize
Are we still before the first real event = before the modules are initialized = before the forking?
Definition HLTZMQ2Ds.h:80
void initialize() override
Register the needed store arrays. In case of the HLT, this are only the raw data objects,...
Definition HLTZMQ2Ds.cc:56
void event() override
Handle the cases (a) before first event, (b) first event and (c) normal event as described in the cla...
Definition HLTZMQ2Ds.cc:61
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition HLTZMQ2Ds.h:68
unsigned int m_lastExperiment
Default experiment number to be set during initialization/run end to have something to load the geome...
Definition HLTZMQ2Ds.h:90
unsigned int m_param_bufferSize
Module parameter: how many events should be kept in flight. Has an impact on the stopping time as wel...
Definition HLTZMQ2Ds.h:85
StoreObjPtr< EventMetaData > m_eventMetaData
Reference to the event meta data to set numbers and flags according to the state and received message...
Definition HLTZMQ2Ds.h:95
HLTZMQ2DsModule()
Register the module parameters.
Definition HLTZMQ2Ds.cc:18
unsigned int m_lastRun
Default run number to be set during initialization/run end to have something to load the geometry....
Definition HLTZMQ2Ds.h:92
HLTStreamHelper m_streamHelper
Utility class for deserialization.
Definition HLTZMQ2Ds.h:73
std::string m_param_input
Module parameter: ZMQ address of the input ZMQ application.
Definition HLTZMQ2Ds.h:83
bool m_param_addExpressRecoObjects
Module parameter: additional to the raw data, also register the data store objects needed for express...
Definition HLTZMQ2Ds.h:87
void setDescription(const std::string &description)
Sets the description of the module.
Definition Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition Module.cc:208
Module()
Constructor.
Definition Module.cc:30
void setReturnValue(int value)
Sets the return value for this module as integer.
Definition Module.cc:220
@ c_Input
This module is an input module (reads data).
Definition Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition Module.h:80
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition Module.h:649
Abstract base class for different kinds of events.
STL namespace.