8 #include <framework/pcore/ProcHandler.h>
9 #include <framework/pcore/zmq/processModules/ZMQRxOutputModule.h>
10 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/core/Environment.h>
21 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
22 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
23 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
24 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
25 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
27 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
28 "set the number of processes to at least 1.",
29 Environment::Instance().getNumberProcesses());
32 void ZMQRxOutputModule::initialize()
34 m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
37 void ZMQRxOutputModule::event()
41 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
42 m_zmqClient.initialize<ZMQ_PULL>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
true);
44 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
45 m_zmqClient.publish(std::move(multicastHelloMsg));
48 m_zmqClient.subscribe(EMessageTypes::c_eventMessage);
49 m_zmqClient.subscribe(EMessageTypes::c_lastEventMessage);
50 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
54 const auto multicastAnswer = [
this](
const auto & socket) {
55 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
56 if (message->isMessage(EMessageTypes::c_eventMessage)) {
57 B2DEBUG(100,
"Having received an event backup. Will go in with this.");
58 m_streamer.read(std::move(message));
60 eventMetaData->addErrorFlag(EventMetaData::EventErrorFlag::c_HLTCrash);
62 }
else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
63 B2DEBUG(100,
"Having received an end message. Will not go on.");
66 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
67 B2DEBUG(100,
"Having received an graceful stop message. Will not go on.");
72 B2ERROR(
"Undefined message on multicast");
76 const auto socketAnswer = [
this](
const auto & socket) {
77 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
78 if (message->isMessage(EMessageTypes::c_eventMessage)) {
79 m_streamer.read(std::move(message));
80 B2DEBUG(100,
"received event " << m_eventMetaData->getEvent());
81 auto confirmMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_confirmMessage, m_eventMetaData);
82 m_zmqClient.publish(std::move(confirmMessage));
86 B2ERROR(
"Undefined message on socket");
91 B2DEBUG(100,
"Start polling");
92 const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
93 B2ASSERT(
"Output process did not receive any message in some time. Aborting.", pollReply);
95 B2DEBUG(100,
"finished reading in an event.");
96 }
catch (zmq::error_t& ex) {
97 if (ex.num() != EINTR) {
98 B2ERROR(
"There was an error during the Rx output event: " << ex.what());
103 void ZMQRxOutputModule::terminate()
105 m_zmqClient.terminate();
Type-safe access to single objects in the data store.
Module connecting the worker path with the output path on the output side.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.