Belle II Software development
StorageZMQ2Ds.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <daq/hbasf2/modules/StorageZMQ2Ds.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11
12#include <framework/logging/Logger.h>
13
14using namespace std;
15using namespace Belle2;
16
17REG_MODULE(StorageZMQ2Ds)
18
20{
21 setDescription(
22 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
23 "and deserializing the to the data store. The connection to the previous ZMQ application "
24 "(most likely a distributor or collector) is handled via a load balanced connection "
25 "(input in this case). The buffer size for the load balanced connection can be "
26 "controlled via a module parameter. "
27 "This module only works in the context of the HLT when using the HLTEventProcessor, "
28 "due to the special form the first event as well as beginRun and endRun are handled. "
29 "Please read the overall description in the HLTEventProcessor for an overview. "
30 "Before the first real event is received (which is the first time the event function "
31 "is called by the HLTEventProcessor, but before the forking), the "
32 "event meta data is initialized with a predefined experiment and run number (set via "
33 "module parameters) so make module initialization in all other modules possible. "
34 "However, no event function should be called for other modules in this event "
35 "(as the data store is invalid). In the first real event after the forking, "
36 "the connection and streamer is initialized. Then, normal event messages "
37 "are deserialized and written to data store. End run or terminate messages are "
38 "handled by setting a special flag of the EventMetaData. Also in this case "
39 "the remaining modules should not process this event via an event function "
40 "(assured by the HLTEventProcessor)."
41 );
42 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
43
44 addParam("input", m_param_input, "ZMQ address of the input ZMQ application");
45 addParam("addExpressRecoObjects", m_param_addExpressRecoObjects,
46 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47 m_param_addExpressRecoObjects);
48 addParam("bufferSize", m_param_bufferSize,
49 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
50
51 addParam("defaultExperiment", m_lastExperiment,
52 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
53 addParam("defaultRun", m_lastRun,
54 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
55 addParam("inInitialize", m_inInitialize,
56 "psh added this for test", true);
57}
58
60{
62}
63
65{
67
68 // The very first event is actually not the first event for processing.
69 // It is just used to initialize the geometry, so we write out
70 // a default event and return immediately. This will cause
71 // all subsequent modules to be initialized.
72 if (m_inInitialize) {
73 m_inInitialize = false;
74
75 m_eventMetaData.create();
76 m_eventMetaData->setExperiment(m_lastExperiment);
78
80 return;
81 }
82
83 try {
84 // If we are not in this initialization step, we can do the normal event processing
85 // This becomes now the first "real" event
86 if (m_firstEvent) {
88
89 m_parent = std::make_unique<ZMQParent>();
90 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
91
92 m_firstEvent = false;
93 }
94
95 const auto reactToInput = [this]() {
96 auto eventMessage = m_input->handleIncomingData();
97
98 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
99 B2DEBUG(10, "Received run change request");
100
101 m_eventMetaData.create();
105 }
106 return;
107 } else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
108 B2DEBUG(10, "Received termination request");
109
110 m_eventMetaData.create();
111 m_eventMetaData->setEndOfData();
112 return;
113 }
114
115 B2ASSERT("Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
116 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
117 B2DEBUG(10, "received event message... write it to data store");
118
119 m_streamHelper.read(std::move(eventMessage));
120
121 B2ASSERT("There is still no event meta data present!", m_eventMetaData);
122 m_lastRun = m_eventMetaData->getRun();
123 m_lastExperiment = m_eventMetaData->getExperiment();
125 };
126
127 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
128 if (!result) {
129 // didn't get any events, probably interrupted by a signal.
130 // We're the input module so let's better have some event meta data
131 // even if it's not useful
132 // If the m_lastRun is 0, it is probably the lastEventMessage. Do not issue the endOfData.
133 if (m_lastRun != 0) {
134 m_eventMetaData.create();
135 m_eventMetaData->setEndOfData();
136 } else {
138 }
139 }
140 } catch (zmq::error_t& error) {
141 // This is an unexpected error: better report it.
142 B2ERROR("ZMQ Error while calling the event: " << error.num());
143 }
144}
Base class for Modules.
Definition: Module.h:72
void setReturnValue(int value)
Sets the return value for this module as integer.
Definition: Module.cc:220
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
Definition: StorageZMQ2Ds.h:57
bool m_firstEvent
Are we still in the first real event?
Definition: StorageZMQ2Ds.h:78
std::unique_ptr< ZMQLoadBalancedInput > m_input
Load balanced connection to the previous ZMQ application.
Definition: StorageZMQ2Ds.h:71
bool m_lastEventIsSpecialMessage
The last event has a special message type?
Definition: StorageZMQ2Ds.h:80
bool m_inInitialize
Are we still before the first real event = before the modules are initialized = before the forking?
Definition: StorageZMQ2Ds.h:82
void initialize() override
Register the needed store arrays. In case of the HLT, this are only the raw data objects,...
void event() override
Handle the cases (a) before first event, (b) first event and (c) normal event as described in the cla...
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
Definition: StorageZMQ2Ds.h:69
unsigned int m_lastExperiment
Default experiment number to be set during initialization/run end to have something to load the geome...
Definition: StorageZMQ2Ds.h:92
unsigned int m_param_bufferSize
Module parameter: how many events should be kept in flight. Has an impact on the stopping time as wel...
Definition: StorageZMQ2Ds.h:87
StoreObjPtr< EventMetaData > m_eventMetaData
Reference to the event meta data to set numbers and flags according to the state and received message...
Definition: StorageZMQ2Ds.h:97
unsigned int m_lastRun
Default run number to be set during initialization/run end to have something to load the geometry....
Definition: StorageZMQ2Ds.h:94
StorageStreamHelper m_streamHelper
Utility class for deserialization.
Definition: StorageZMQ2Ds.h:74
std::string m_param_input
Module parameter: ZMQ address of the input ZMQ application.
Definition: StorageZMQ2Ds.h:85
bool m_param_addExpressRecoObjects
Module parameter: additional to the raw data, also register the data store objects needed for express...
Definition: StorageZMQ2Ds.h:89
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.