8 #include <daq/hbasf2/modules/HLTZMQ2Ds.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 #include <framework/logging/Logger.h>
21 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
22 "and deserializing the to the data store. The connection to the previous ZMQ application "
23 "(most likely a distributor or collector) is handled via a load balanced connection "
24 "(input in this case). The buffer size for the load balanced connection can be "
25 "controlled via a module parameter. "
26 "This module only works in the context of the HLT when using the HLTEventProcessor, "
27 "due to the special form the first event as well as beginRun and endRun are handled. "
28 "Please read the overall description in the HLTEventProcessor for an overview. "
29 "Before the first real event is received (which is the first time the event function "
30 "is called by the HLTEventProcessor, but before the forking), the "
31 "event meta data is initialized with a predefined experiment and run number (set via "
32 "module parameters) so make module initialization in all other modules possible. "
33 "However, no event function should be called for other modules in this event "
34 "(as the data store is invalid). In the first real event after the forking, "
35 "the connection and streamer is initialized. Then, normal event messages "
36 "are deserialized and written to data store. End run or terminate messages are "
37 "handled by setting a special flag of the EventMetaData. Also in this case "
38 "the remaining modules should not process this event via an event function "
39 "(assured by the HLTEventProcessor)."
41 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
43 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
44 addParam(
"addExpressRecoObjects", m_param_addExpressRecoObjects,
45 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
46 m_param_addExpressRecoObjects);
47 addParam(
"bufferSize", m_param_bufferSize,
48 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
50 addParam(
"defaultExperiment", m_lastExperiment,
51 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
52 addParam(
"defaultRun", m_lastRun,
53 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
56 void HLTZMQ2DsModule::initialize()
58 m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
61 void HLTZMQ2DsModule::event()
70 m_inInitialize =
false;
72 m_eventMetaData.create();
73 m_eventMetaData->setExperiment(m_lastExperiment);
74 m_eventMetaData->setRun(m_lastRun);
84 m_streamHelper.initialize();
86 m_parent = std::make_unique<ZMQParent>();
87 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
92 const auto reactToInput = [
this]() {
93 auto eventMessage = m_input->handleIncomingData();
95 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
96 B2DEBUG(10,
"Received run change request");
98 m_eventMetaData.create();
99 if (m_firstEventIsSpecialMessage) {
100 m_eventMetaData->setExperiment(m_lastExperiment);
101 m_eventMetaData->setRun(m_lastRun);
103 m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
106 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
107 B2DEBUG(10,
"Received termination request");
109 m_eventMetaData.create();
110 m_eventMetaData->setEndOfData();
114 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
115 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
116 B2DEBUG(10,
"received event message... write it to data store");
118 m_streamHelper.read(std::move(eventMessage));
120 B2ASSERT(
"There is still no event meta data present!", m_eventMetaData);
121 m_lastRun = m_eventMetaData->getRun();
122 m_lastExperiment = m_eventMetaData->getExperiment();
123 m_firstEventIsSpecialMessage =
false;
126 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
131 m_eventMetaData.create();
132 m_eventMetaData->setEndOfData();
134 }
catch (zmq::error_t& error) {
136 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.