8 #include <daq/hbasf2/modules/HLTZMQ2DsDirect.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
11 #include <framework/logging/Logger.h>
23 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
25 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
26 addParam(
"bufferSize", m_param_bufferSize,
27 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
30 void HLTZMQ2DsDirectModule::readEvent()
33 bool tryAgain =
false;
34 const auto reactToInput = [
this, &tryAgain]() {
35 auto eventMessage = m_input->handleIncomingData();
37 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
41 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
42 B2DEBUG(10,
"Received termination request");
47 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
48 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
49 B2DEBUG(10,
"received event message... write it to data store");
51 m_streamHelper.read(std::move(eventMessage));
56 ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
58 }
catch (zmq::error_t& error) {
59 if (error.num() == EINTR) {
61 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
65 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
69 void HLTZMQ2DsDirectModule::initialize()
71 m_streamHelper.initialize();
73 m_parent = std::make_unique<ZMQParent>();
74 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
79 void HLTZMQ2DsDirectModule::event()
Special ZMQ2Ds module without the HLT-specific handling of initialization and begin/end run.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.