1 #include <daq/hbasf2/modules/HLTZMQ2Ds.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
4 #include <framework/logging/Logger.h>
14 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
15 "and deserializing the to the data store. The connection to the previous ZMQ application "
16 "(most likely a distributor or collector) is handled via a load balanced connection "
17 "(input in this case). The buffer size for the load balanced connection can be "
18 "controlled via a module parameter. "
19 "This module only works in the context of the HLT when using the HLTEventProcessor, "
20 "due to the special form the first event as well as beginRun and endRun are handled. "
21 "Please read the overall description in the HLTEventProcessor for an overview. "
22 "Before the first real event is received (which is the first time the event function "
23 "is called by the HLTEventProcessor, but before the forking), the "
24 "event meta data is initialized with a predefined experiment and run number (set via "
25 "module parameters) so make module initialization in all other modules possible. "
26 "However, no event function should be called for other modules in this event "
27 "(as the data store is invalid). In the first real event after the forking, "
28 "the connection and streamer is initialized. Then, normal event messages "
29 "are deserialized and written to data store. End run or terminate messages are "
30 "handled by setting a special flag of the EventMetaData. Also in this case "
31 "the remaining modules should not process this event via an event function "
32 "(assured by the HLTEventProcessor)."
34 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
36 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
37 addParam(
"addExpressRecoObjects", m_param_addExpressRecoObjects,
38 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
39 m_param_addExpressRecoObjects);
40 addParam(
"bufferSize", m_param_bufferSize,
41 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
43 addParam(
"defaultExperiment", m_lastExperiment,
44 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
45 addParam(
"defaultRun", m_lastRun,
46 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
49 void HLTZMQ2DsModule::initialize()
51 m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
54 void HLTZMQ2DsModule::event()
63 m_inInitialize =
false;
65 m_eventMetaData.create();
66 m_eventMetaData->setExperiment(m_lastExperiment);
67 m_eventMetaData->setRun(m_lastRun);
77 m_streamHelper.initialize();
79 m_parent = std::make_unique<ZMQParent>();
80 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
85 const auto reactToInput = [
this]() {
86 auto eventMessage = m_input->handleIncomingData();
88 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
89 B2DEBUG(10,
"Received run change request");
91 m_eventMetaData.create();
92 m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
94 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
95 B2DEBUG(10,
"Received termination request");
97 m_eventMetaData.create();
98 m_eventMetaData->setEndOfData();
102 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
103 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
104 B2DEBUG(10,
"received event message... write it to data store");
106 m_streamHelper.read(std::move(eventMessage));
108 B2ASSERT(
"There is still no event meta data present!", m_eventMetaData);
109 m_lastRun = m_eventMetaData->getRun();
110 m_lastExperiment = m_eventMetaData->getExperiment();
113 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
118 m_eventMetaData.create();
119 m_eventMetaData->setEndOfData();
121 }
catch (zmq::error_t& error) {
123 B2ERROR(
"ZMQ Error while calling the event: " << error.num());