1 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
4 #include <framework/pcore/EvtMessage.h>
5 #include <framework/core/Environment.h>
6 #include <framework/core/RandomGenerator.h>
7 #include <framework/datastore/StoreObjPtr.h>
19 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
20 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
21 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
22 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
23 addParam(
"workerProcessTimeout", m_param_workerProcessTimeout,
"Maximal time a worker is allowed to spent per event");
24 addParam(
"useEventBackup", m_param_useEventBackup,
"Turn on the event backup");
26 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
28 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
29 "set the number of processes to at least 1.",
30 Environment::Instance().getNumberProcesses());
33 void ZMQTxInputModule::initialize()
36 randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
39 void ZMQTxInputModule::event()
43 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
44 m_zmqClient.initialize<ZMQ_ROUTER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
true);
46 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
47 m_zmqClient.publish(std::move(multicastHelloMsg));
50 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
51 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
52 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
53 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
58 if (not m_zmqClient.isOnline()) {
62 int timeout = m_param_maximalWaitingTime;
63 if (not m_nextWorker.empty()) {
68 bool terminate =
false;
70 const auto multicastAnswer = [
this, &terminate](
const auto & socket) {
71 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
72 const std::string& data = multicastMessage->getData();
74 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
75 m_workers.push_back(std::stoi(data));
76 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying");
77 auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
78 m_zmqClient.send(std::move(replyHelloMessage));
80 }
else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
81 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
82 m_procEvtBackupList.removeEvent(eventMetaData);
83 B2DEBUG(10,
"removed event backup.. list size: " << m_procEvtBackupList.size());
85 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
86 const int workerID = std::atoi(data.c_str());
87 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
89 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
90 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
92 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
93 B2DEBUG(10,
"Having received a stop message. I can not do much here, but just hope for the best.");
101 const auto socketAnswer = [
this](
const auto & socket) {
102 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
103 if (message->isMessage(EMessageTypes::c_readyMessage)) {
104 B2DEBUG(10,
"got worker ready message");
105 m_nextWorker.push_back(std::stoi(message->getIdentity()));
109 B2ERROR(
"Invalid message from worker");
113 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
116 m_zmqClient.terminate();
120 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
122 const unsigned int nextWorker = m_nextWorker.front();
123 m_nextWorker.pop_front();
124 B2DEBUG(10,
"Next worker is " << nextWorker);
126 auto eventMessage = m_streamer.stream();
128 if (eventMessage->size() > 0) {
129 auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
130 m_zmqClient.send(std::move(message));
131 B2DEBUG(10,
"Having send message to worker " << nextWorker);
133 if (m_param_useEventBackup) {
134 m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
135 B2DEBUG(10,
"stored event " << m_eventMetaData->getEvent() <<
" backup.. list size: " << m_procEvtBackupList.size());
136 checkWorkerProcTimeout();
138 B2DEBUG(10,
"finished event");
140 }
catch (zmq::error_t& ex) {
141 if (ex.num() != EINTR) {
142 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
144 }
catch (exception& ex) {
151 void ZMQTxInputModule::checkWorkerProcTimeout()
153 if (not m_param_useEventBackup or m_param_workerProcessTimeout == 0) {
157 const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
158 int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
160 B2WARNING(
"Worker process timeout, workerID: " << workerID);
161 auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
162 m_zmqClient.publish(std::move(deathMessage));
164 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
165 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
169 void ZMQTxInputModule::terminate()
172 if (not m_zmqClient.isOnline()) {
176 for (
unsigned int workerID : m_workers) {
177 std::string workerIDString = std::to_string(workerID);
178 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
179 m_zmqClient.send(std::move(message));
182 const auto multicastAnswer = [
this](
const auto & socket) {
183 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
184 const std::string& data = multicastMessage->getData();
186 if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
187 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
188 m_procEvtBackupList.removeEvent(eventMetaData);
189 B2DEBUG(10,
"removed event backup.. list size: " << m_procEvtBackupList.size());
191 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
192 const int workerID = std::atoi(data.c_str());
194 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
195 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
197 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
199 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying with end message");
200 auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
201 m_zmqClient.send(std::move(message));
207 while (m_param_useEventBackup and m_procEvtBackupList.size() > 0) {
208 checkWorkerProcTimeout();
209 m_zmqClient.pollMulticast(0, multicastAnswer);
210 std::this_thread::sleep_for(std::chrono::milliseconds(100));
213 if (m_param_useEventBackup) {
215 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
216 m_zmqClient.publish(std::move(message));
219 m_zmqClient.terminate();