8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
26 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
27 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
28 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
29 addParam(
"maximalWaitingTime", m_param_maximalWaitingTime,
"Maximal time to wait for any message");
30 addParam(
"workerProcessTimeout", m_param_workerProcessTimeout,
"Maximal time a worker is allowed to spent per event");
31 addParam(
"useEventBackup", m_param_useEventBackup,
"Turn on the event backup");
33 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
35 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
37 Environment::Instance().getNumberProcesses());
40 void ZMQTxInputModule::initialize()
46 void ZMQTxInputModule::event()
50 m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
51 m_zmqClient.initialize<ZMQ_ROUTER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName,
true);
53 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
54 m_zmqClient.publish(std::move(multicastHelloMsg));
57 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
58 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
59 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
60 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
65 if (not m_zmqClient.isOnline()) {
69 int timeout = m_param_maximalWaitingTime;
70 if (not m_nextWorker.empty()) {
75 bool terminate =
false;
77 const auto multicastAnswer = [
this, &terminate](
const auto & socket) {
78 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
79 const std::string& data = multicastMessage->getData();
81 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
82 m_workers.push_back(std::stoi(data));
83 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying");
84 auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
85 m_zmqClient.send(std::move(replyHelloMessage));
87 }
else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
88 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
89 m_procEvtBackupList.removeEvent(eventMetaData);
90 B2DEBUG(10,
"removed event backup.. list size: " << m_procEvtBackupList.size());
92 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
93 const int workerID = std::atoi(data.c_str());
94 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
96 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
97 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
99 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
100 B2DEBUG(10,
"Having received a stop message. I can not do much here, but just hope for the best.");
108 const auto socketAnswer = [
this](
const auto & socket) {
109 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
110 if (message->isMessage(EMessageTypes::c_readyMessage)) {
111 B2DEBUG(10,
"got worker ready message");
112 m_nextWorker.push_back(std::stoi(message->getIdentity()));
116 B2ERROR(
"Invalid message from worker");
120 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
123 m_zmqClient.terminate();
127 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
129 const unsigned int nextWorker = m_nextWorker.front();
130 m_nextWorker.pop_front();
131 B2DEBUG(10,
"Next worker is " << nextWorker);
133 auto eventMessage = m_streamer.stream();
135 if (eventMessage->size() > 0) {
136 auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
137 m_zmqClient.send(std::move(message));
138 B2DEBUG(10,
"Having send message to worker " << nextWorker);
140 if (m_param_useEventBackup) {
141 m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
142 B2DEBUG(10,
"stored event " << m_eventMetaData->getEvent() <<
" backup.. list size: " << m_procEvtBackupList.size());
143 checkWorkerProcTimeout();
145 B2DEBUG(10,
"finished event");
147 }
catch (zmq::error_t& ex) {
148 if (ex.num() != EINTR) {
149 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
151 }
catch (exception& ex) {
158 void ZMQTxInputModule::checkWorkerProcTimeout()
160 if (not m_param_useEventBackup or m_param_workerProcessTimeout == 0) {
164 const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
165 int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
167 B2WARNING(
"Worker process timeout, workerID: " << workerID);
168 auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
169 m_zmqClient.publish(std::move(deathMessage));
171 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
172 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
176 void ZMQTxInputModule::terminate()
179 if (not m_zmqClient.isOnline()) {
183 for (
unsigned int workerID : m_workers) {
184 std::string workerIDString = std::to_string(workerID);
185 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
186 m_zmqClient.send(std::move(message));
189 const auto multicastAnswer = [
this](
const auto & socket) {
190 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
191 const std::string& data = multicastMessage->getData();
193 if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
194 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
195 m_procEvtBackupList.removeEvent(eventMetaData);
196 B2DEBUG(10,
"removed event backup.. list size: " << m_procEvtBackupList.size());
198 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
199 const int workerID = std::atoi(data.c_str());
201 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
202 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
204 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
206 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying with end message");
207 auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
208 m_zmqClient.send(std::move(message));
214 while (m_param_useEventBackup and m_procEvtBackupList.size() > 0) {
215 checkWorkerProcTimeout();
216 m_zmqClient.pollMulticast(0, multicastAnswer);
217 std::this_thread::sleep_for(std::chrono::milliseconds(100));
220 if (m_param_useEventBackup) {
222 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
223 m_zmqClient.publish(std::move(message));
226 m_zmqClient.terminate();
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.