1 #include <daq/hbasf2/modules/HLTZMQ2DsDirect.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
4 #include <framework/logging/Logger.h>
16 setPropertyFlags(EModulePropFlags::c_Input | EModulePropFlags::c_ParallelProcessingCertified);
18 addParam(
"input", m_param_input,
"ZMQ address of the input ZMQ application");
19 addParam(
"bufferSize", m_param_bufferSize,
20 "How many events should be kept in flight. Has an impact on the stopping time as well as the rate stability", m_param_bufferSize);
23 void HLTZMQ2DsDirectModule::readEvent()
26 bool tryAgain =
false;
27 const auto reactToInput = [
this, &tryAgain]() {
28 auto eventMessage = m_input->handleIncomingData();
30 if (eventMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
34 }
else if (eventMessage->isMessage(EMessageTypes::c_terminateMessage)) {
35 B2DEBUG(10,
"Received termination request");
40 B2ASSERT(
"Must be event message", eventMessage->isMessage(EMessageTypes::c_eventMessage) or
41 eventMessage->isMessage(EMessageTypes::c_rawDataMessage));
42 B2DEBUG(10,
"received event message... write it to data store");
44 m_streamHelper.read(std::move(eventMessage));
49 ZMQConnection::poll({{m_input.get(), reactToInput}}, -1);
51 }
catch (zmq::error_t& error) {
52 if (error.num() == EINTR) {
54 B2DEBUG(10,
"Received an signal interrupt during the event call. Will return");
58 B2ERROR(
"ZMQ Error while calling the event: " << error.num());
62 void HLTZMQ2DsDirectModule::initialize()
64 m_streamHelper.initialize();
66 m_parent = std::make_unique<ZMQParent>();
67 m_input = std::make_unique<ZMQLoadBalancedInput>(m_param_input, m_param_bufferSize, m_parent);
72 void HLTZMQ2DsDirectModule::event()