9 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
13 #include <framework/core/Environment.h>
22 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
23 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
24 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
25 addParam(
"eventBufferSize", m_param_bufferSize,
"Maximal number of events to store in the internal buffer");
26 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
28 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
30 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
31 "set the number of processes to at least 1.",
32 Environment::Instance().getNumberProcesses());
35 void ZMQRxWorkerModule::initialize()
37 m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
40 void ZMQRxWorkerModule::event()
44 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
45 m_zmqClient.initialize<ZMQ_DEALER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
false);
48 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
51 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
52 m_zmqClient.publish(std::move(multicastHelloMsg));
54 auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
55 m_zmqClient.publish(std::move(helloMessage));
57 bool inputProcessIsGone =
false;
60 const auto socketHelloAnswer = [&inputProcessIsGone](
const auto & socket) {
61 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
62 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
63 inputProcessIsGone =
true;
66 B2ASSERT(
"Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
69 const auto pollResult = m_zmqClient.pollSocket(60 * 1000, socketHelloAnswer);
70 if (inputProcessIsGone or not pollResult) {
71 B2WARNING(
"It seems the input process is already gone.");
76 for (
unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
77 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
78 m_zmqClient.send(std::move(readyMessage));
83 const auto multicastAnswer = [](
const auto & socket) {
84 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
85 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
86 B2DEBUG(10,
"Having received an graceful stop message. Will now go on.");
91 B2ERROR(
"Undefined message on multicast");
95 const auto socketAnswer = [
this](
const auto & socket) {
96 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
97 if (message->isMessage(EMessageTypes::c_eventMessage)) {
98 B2DEBUG(10,
"received event message... write it to data store");
99 m_streamer.read(std::move(message));
100 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
101 m_zmqClient.send(std::move(readyMessage));
103 }
else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
104 B2DEBUG(10,
"received end message from input");
108 B2DEBUG(10,
"received unexpected message from input");
112 const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
113 B2ASSERT(
"The input process did not send any event in some time!", pollReply);
115 B2DEBUG(10,
"Finished with event");
116 }
catch (zmq::error_t& ex) {
117 if (ex.num() != EINTR) {
118 B2ERROR(
"There was an error during the Rx worker event: " << ex.what());
123 void ZMQRxWorkerModule::terminate()
125 m_zmqClient.terminate();
Module connecting the input path with the worker path on the worker side.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.