1 #include <framework/pcore/ProcHandler.h>
2 #include <framework/pcore/zmq/processModules/ZMQRxOutputModule.h>
3 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
4 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
5 #include <framework/core/Environment.h>
14 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
15 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
16 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
17 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
18 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
20 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
21 "set the number of processes to at least 1.",
22 Environment::Instance().getNumberProcesses());
25 void ZMQRxOutputModule::initialize()
27 m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
30 void ZMQRxOutputModule::event()
34 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
35 m_zmqClient.initialize<ZMQ_PULL>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
true);
37 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
38 m_zmqClient.publish(std::move(multicastHelloMsg));
41 m_zmqClient.subscribe(EMessageTypes::c_eventMessage);
42 m_zmqClient.subscribe(EMessageTypes::c_lastEventMessage);
43 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
47 const auto multicastAnswer = [
this](
const auto & socket) {
48 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
49 if (message->isMessage(EMessageTypes::c_eventMessage)) {
50 B2DEBUG(100,
"Having received an event backup. Will go in with this.");
51 m_streamer.read(std::move(message));
53 eventMetaData->addErrorFlag(EventMetaData::EventErrorFlag::c_HLTCrash);
55 }
else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
56 B2DEBUG(100,
"Having received an end message. Will not go on.");
59 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
60 B2DEBUG(100,
"Having received an graceful stop message. Will not go on.");
65 B2ERROR(
"Undefined message on multicast");
69 const auto socketAnswer = [
this](
const auto & socket) {
70 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
71 if (message->isMessage(EMessageTypes::c_eventMessage)) {
72 m_streamer.read(std::move(message));
73 B2DEBUG(100,
"received event " << m_eventMetaData->getEvent());
74 auto confirmMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_confirmMessage, m_eventMetaData);
75 m_zmqClient.publish(std::move(confirmMessage));
79 B2ERROR(
"Undefined message on socket");
84 B2DEBUG(100,
"Start polling");
85 const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
86 B2ASSERT(
"Output process did not receive any message in some time. Aborting.", pollReply);
88 B2DEBUG(100,
"finished reading in an event.");
89 }
catch (zmq::error_t& ex) {
90 if (ex.num() != EINTR) {
91 B2ERROR(
"There was an error during the Rx output event: " << ex.what());
96 void ZMQRxOutputModule::terminate()
98 m_zmqClient.terminate();