11 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
15 #include <framework/core/Environment.h>
24 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
25 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
26 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
27 addParam(
"eventBufferSize", m_param_bufferSize,
"Maximal number of events to store in the internal buffer");
28 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
30 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
32 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
33 "set the number of processes to at least 1.",
34 Environment::Instance().getNumberProcesses());
37 void ZMQRxWorkerModule::initialize()
39 m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
42 void ZMQRxWorkerModule::event()
46 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
47 m_zmqClient.initialize<ZMQ_DEALER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
false);
50 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
53 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
54 m_zmqClient.publish(std::move(multicastHelloMsg));
56 auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
57 m_zmqClient.publish(std::move(helloMessage));
59 bool inputProcessIsGone =
false;
62 const auto socketHelloAnswer = [&inputProcessIsGone](
const auto & socket) {
63 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
64 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
65 inputProcessIsGone =
true;
68 B2ASSERT(
"Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
71 const auto pollResult = m_zmqClient.pollSocket(60 * 1000, socketHelloAnswer);
72 if (inputProcessIsGone or not pollResult) {
73 B2WARNING(
"It seems the input process is already gone.");
78 for (
unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
79 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
80 m_zmqClient.send(std::move(readyMessage));
85 const auto multicastAnswer = [](
const auto & socket) {
86 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
87 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
88 B2DEBUG(10,
"Having received an graceful stop message. Will now go on.");
93 B2ERROR(
"Undefined message on multicast");
97 const auto socketAnswer = [
this](
const auto & socket) {
98 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
99 if (message->isMessage(EMessageTypes::c_eventMessage)) {
100 B2DEBUG(10,
"received event message... write it to data store");
101 m_streamer.read(std::move(message));
102 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
103 m_zmqClient.send(std::move(readyMessage));
105 }
else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
106 B2DEBUG(10,
"received end message from input");
110 B2DEBUG(10,
"received unexpected message from input");
114 const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
115 B2ASSERT(
"The input process did not send any event in some time!", pollReply);
117 B2DEBUG(10,
"Finished with event");
118 }
catch (zmq::error_t& ex) {
119 if (ex.num() != EINTR) {
120 B2ERROR(
"There was an error during the Rx worker event: " << ex.what());
125 void ZMQRxWorkerModule::terminate()
127 m_zmqClient.terminate();