Belle II Software development
HLTZMQ2Ds.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/modules/HLTZMQ2Ds.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10
11#include <framework/logging/Logger.h>
12
13using namespace std;
14using namespace Belle2;
15
16REG_MODULE(HLTZMQ2Ds);
17
19{
21 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
22 "and deserializing the to the data store. The connection to the previous ZMQ application "
23 "(most likely a distributor or collector) is handled via a load balanced connection "
24 "(input in this case). The buffer size for the load balanced connection can be "
25 "controlled via a module parameter. "
26 "This module only works in the context of the HLT when using the HLTEventProcessor, "
27 "due to the special form the first event as well as beginRun and endRun are handled. "
28 "Please read the overall description in the HLTEventProcessor for an overview. "
29 "Before the first real event is received (which is the first time the event function "
30 "is called by the HLTEventProcessor, but before the forking), the "
31 "event meta data is initialized with a predefined experiment and run number (set via "
32 "module parameters) so make module initialization in all other modules possible. "
33 "However, no event function should be called for other modules in this event "
34 "(as the data store is invalid). In the first real event after the forking, "
35 "the connection and streamer is initialized. Then, normal event messages "
36 "are deserialized and written to data store. End run or terminate messages are "
37 "handled by setting a special flag of the EventMetaData. Also in this case "
38 "the remaining modules should not process this event via an event function "
39 "(assured by the HLTEventProcessor)."
40 );
42
43 addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
44 addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
45 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47 addParam("bufferSize", m_param_bufferSize,
48 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
49
50 addParam("defaultExperiment", m_lastExperiment,
51 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
52 addParam("defaultRun", m_lastRun,
53 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
54}
55
57{
59}
60
62{
64
65 // The very first event is actually not the first event for processing.
66 // It is just used to initialize the geometry, so we write out
67 // a default event and return immediately. This will cause
68 // all subsequent modules to be initialized.
69 if (m_inInitialize) {
70 m_inInitialize = false;
71
72 m_eventMetaData.create();
73 m_eventMetaData->setExperiment(m_lastExperiment);
75
77 return;
78 }
79
80 try {
81 // If we are not in this initialization step, we can do the normal event processing
82 // This becomes now the first "real" event
83 if (m_firstEvent) {
85
86 m_parent = std::make_unique<ZMQParent>();
87 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
88
89 m_firstEvent = false;
90 }
91
92 const auto reactToInput = [this]() {
93 auto eventMessage = m_input->handleIncomingData();
94
95 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
96 B2DEBUG(10, "Received run change request");
97
98 m_eventMetaData.create();
100 m_eventMetaData->setExperiment(m_lastExperiment);
101 m_eventMetaData->setRun(m_lastRun);
102 } else {
104 }
105 return;
106 } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
107 B2DEBUG(10, "Received termination request");
108
109 m_eventMetaData.create();
110 m_eventMetaData->setEndOfData();
111 return;
112 }
113
114 B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
115 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
116 B2DEBUG(10, "received event message... write it to data store");
117
118 m_streamHelper.read(std::move(eventMessage));
119
120 B2ASSERT("There is still no event meta data present!", m_eventMetaData);
121 m_lastRun = m_eventMetaData->getRun();
122 m_lastExperiment = m_eventMetaData->getExperiment();
124 };
125
126 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
127 if (!result) {
128 // didn't get any events, probably interrupted by a signal.
129 // We're the input module so let's better have some event meta data
130 // even if it's not useful
131 m_eventMetaData.create();
132 m_eventMetaData->setEndOfData();
133 }
134 } catch (zmq::error_t& error) {
135 // This is an unexpected error: better report it.
136 B2ERROR("ZMQ Error while calling the event: " << error.num());
137 }
138}
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
bool m_firstEvent
Are we still in the first real event?
Definition: HLTZMQ2Ds.h:75
std::unique_ptr< ZMQLoadBalancedInput > m_input
Load balanced connection to the previous ZMQ application.
Definition: HLTZMQ2Ds.h:69
bool m_inInitialize
Are we still before the first real event = before the modules are initialized = before the forking?
Definition: HLTZMQ2Ds.h:79
void initialize() override
Register the needed store arrays. In case of the HLT, this are only the raw data objects,...
Definition: HLTZMQ2Ds.cc:56
void event() override
Handle the cases (a) before first event, (b) first event and (c) normal event as described in the cla...
Definition: HLTZMQ2Ds.cc:61
bool m_firstEventIsSpecialMessage
The first event has a special message type?
Definition: HLTZMQ2Ds.h:77
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition: HLTZMQ2Ds.h:67
unsigned int m_lastExperiment
Default experiment number to be set during initialization/run end to have something to load the geome...
Definition: HLTZMQ2Ds.h:89
unsigned int m_param_bufferSize
Module parameter: how many events should be kept in flight. Has an impact on the stopping time as wel...
Definition: HLTZMQ2Ds.h:84
StoreObjPtr< EventMetaData > m_eventMetaData
Reference to the event meta data to set numbers and flags according to the state and received message...
Definition: HLTZMQ2Ds.h:94
HLTZMQ2DsModule()
Register the module parameters.
Definition: HLTZMQ2Ds.cc:18
unsigned int m_lastRun
Default run number to be set during initialization/run end to have something to load the geometry....
Definition: HLTZMQ2Ds.h:91
HLTStreamHelper m_streamHelper
Utility class for deserialization.
Definition: HLTZMQ2Ds.h:72
std::string m_param_input
Module parameter: ZMQ address of the input ZMQ application.
Definition: HLTZMQ2Ds.h:82
bool m_param_addExpressRecoObjects
Module parameter: additional to the raw data, also register the data store objects needed for express...
Definition: HLTZMQ2Ds.h:86
Base class for Modules.
Definition: Module.h:72
void setDescription(const std::string &description)
Sets the description of the module.
Definition: Module.cc:214
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
void setReturnValue(int value)
Sets the return value for this module as integer.
Definition: Module.cc:220
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.