9#include <daq/hbasf2/modules/StorageZMQ2Ds.h>
10#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/logging/Logger.h>
22 "Input module in the ZMQ reconstruction path receiving events via ZMQ "
23 "and deserializing the to the data store. The connection to the previous ZMQ application "
24 "(most likely a distributor or collector) is handled via a load balanced connection "
25 "(input in this case). The buffer size for the load balanced connection can be "
26 "controlled via a module parameter. "
27 "This module only works in the context of the HLT when using the HLTEventProcessor, "
28 "due to the special form the first event as well as beginRun and endRun are handled. "
29 "Please read the overall description in the HLTEventProcessor for an overview. "
30 "Before the first real event is received (which is the first time the event function "
31 "is called by the HLTEventProcessor, but before the forking), the "
32 "event meta data is initialized with a predefined experiment and run number (set via "
33 "module parameters) so make module initialization in all other modules possible. "
34 "However, no event function should be called for other modules in this event "
35 "(as the data store is invalid). In the first real event after the forking, "
36 "the connection and streamer is initialized. Then, normal event messages "
37 "are deserialized and written to data store. End run or terminate messages are "
38 "handled by setting a special flag of the EventMetaData. Also in this case "
39 "the remaining modules should not process this event via an event function "
40 "(assured by the HLTEventProcessor)."
42 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
44 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
45 addParam(
"addExpressRecoObjects", m_param_addExpressRecoObjects,
46 "Additional to the raw data, also register the data store objects needed for express reco. TODO: this might change",
47 m_param_addExpressRecoObjects);
48 addParam(
"bufferSize", m_param_bufferSize,
49 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
51 addParam(
"defaultExperiment", m_lastExperiment,
52 "Default experiment number to be set during initialization/run end to have something to load the geometry.", m_lastExperiment);
53 addParam(
"defaultRun", m_lastRun,
54 "Default run number to be set during initialization/run end to have something to load the geometry.", m_lastRun);
55 addParam(
"inInitialize", m_inInitialize,
56 "psh added this for test",
true);
89 m_parent = std::make_unique<ZMQParent>();
95 const auto reactToInput = [
this]() {
96 auto eventMessage =
m_input->handleIncomingData();
98 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
99 B2DEBUG(10,
"Received run change request");
107 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
108 B2DEBUG(10,
"Received termination request");
115 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
116 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
117 B2DEBUG(10,
"received event message... write it to data store");
121 B2ASSERT(
"There is still no event meta data present!",
m_eventMetaData);
140 }
catch (zmq::error_t& error) {
142 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
void setReturnValue(int value)
Sets the return value for this module as integer.
void initialize()
Initialize this class. Call this e.g. in the first event.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void registerStoreObjects(bool addExpressRecoObjects)
Register all needed store objects, either only the raw data, ROIs and event meta data (for HLT) or ad...
Input module in the ZMQ reconstruction path receiving events via ZMQ and deserializing the to the dat...
bool m_firstEvent
Are we still in the first real event?
std::unique_ptr< ZMQLoadBalancedInput > m_input
Load balanced connection to the previous ZMQ application.
bool m_lastEventIsSpecialMessage
The last event has a special message type?
bool m_inInitialize
Are we still before the first real event = before the modules are initialized = before the forking?
void initialize() override
Register the needed store arrays. In case of the HLT, this are only the raw data objects,...
void event() override
Handle the cases (a) before first event, (b) first event and (c) normal event as described in the cla...
std::shared_ptr< ZMQParent > m_parent
ZMQ Parent needed for the connections.
unsigned int m_lastExperiment
Default experiment number to be set during initialization/run end to have something to load the geome...
unsigned int m_param_bufferSize
Module parameter: how many events should be kept in flight. Has an impact on the stopping time as wel...
StoreObjPtr< EventMetaData > m_eventMetaData
Reference to the event meta data to set numbers and flags according to the state and received message...
unsigned int m_lastRun
Default run number to be set during initialization/run end to have something to load the geometry....
StorageStreamHelper m_streamHelper
Utility class for deserialization.
std::string m_param_input
Module parameter: ZMQ address of the input ZMQ application.
bool m_param_addExpressRecoObjects
Module parameter: additional to the raw data, also register the data store objects needed for express...
static bool poll(const std::map< const ZMQConnection *, ReactorFunction > &connectionList, int timeout)
Poll on the given connections and call the attached function if a messages comes in.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.