9 #include <daq/hbasf2/modules/StorageZMQ2Ds.h>
10 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/logging/Logger.h>
22 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
23 "and deserializing the to the data store. The connection to the previous ZMQ application "
24 "(most likely a distributor or collector) is handled via a load balanced connection "
25 "(input in this case). The buffer size for the load balanced connection can be "
26 "controlled via a module parameter. "
27 "This module only works in the context of the HLT when using the HLTEventProcessor, "
28 "due to the special form the first event as well as beginRun and endRun are handled. "
29 "Please read the overall description in the HLTEventProcessor for an overview. "
30 "Before the first real event is received (which is the first time the event function "
31 "is called by the HLTEventProcessor, but before the forking), the "
32 "event meta data is initialized with a predefined experiment and run number (set via "
33 "module parameters) so make module initialization in all other modules possible. "
34 "However, no event function should be called for other modules in this event "
35 "(as the data store is invalid). In the first real event after the forking, "
36 "the connection and streamer is initialized. Then, normal event messages "
37 "are deserialized and written to data store. End run or terminate messages are "
38 "handled by setting a special flag of the EventMetaData. Also in this case "
39 "the remaining modules should not process this event via an event function "
40 "(assured by the HLTEventProcessor)."
42 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
44 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
45 addParam(
"addExpressRecoObjects", m_param_addExpressRecoObjects,
46 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47 m_param_addExpressRecoObjects);
48 addParam(
"bufferSize", m_param_bufferSize,
49 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
51 addParam(
"defaultExperiment", m_lastExperiment,
52 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
53 addParam(
"defaultRun", m_lastRun,
54 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
55 addParam(
"inInitialize", m_inInitialize,
56 "psh added this for test",
true);
59 void StorageZMQ2DsModule::initialize()
61 m_streamHelper.registerStoreObjects(m_param_addExpressRecoObjects);
64 void StorageZMQ2DsModule::event()
73 m_inInitialize =
false;
75 m_eventMetaData.create();
76 m_eventMetaData->setExperiment(m_lastExperiment);
77 m_eventMetaData->setRun(m_lastRun);
87 m_streamHelper.initialize();
89 m_parent = std::make_unique<ZMQParent>();
90 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
95 const auto reactToInput = [
this]() {
96 auto eventMessage = m_input->handleIncomingData();
98 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
99 B2DEBUG(10,
"Received run change request");
101 m_eventMetaData.create();
102 m_eventMetaData->setEndOfRun(m_lastExperiment, m_lastRun);
104 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105 B2DEBUG(10,
"Received termination request");
107 m_eventMetaData.create();
108 m_eventMetaData->setEndOfData();
112 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
113 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
114 B2DEBUG(10,
"received event message... write it to data store");
116 m_streamHelper.read(std::move(eventMessage));
118 B2ASSERT(
"There is still no event meta data present!", m_eventMetaData);
119 m_lastRun = m_eventMetaData->getRun();
120 m_lastExperiment = m_eventMetaData->getExperiment();
123 bool result = ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
128 m_eventMetaData.create();
129 m_eventMetaData->setEndOfData();
131 }
catch (zmq::error_t& error) {
133 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.