Belle II Software  release-05-02-19
ZMQTxInputModule.cc
1 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
2 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
3 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
4 #include <framework/pcore/EvtMessage.h>
5 #include <framework/core/Environment.h>
6 #include <framework/core/RandomGenerator.h>
7 #include <framework/datastore/StoreObjPtr.h>
8 #include <thread>
9 #include <chrono>
10 #include <algorithm>
11 
12 using namespace std;
13 using namespace Belle2;
14 
15 REG_MODULE(ZMQTxInput)
16 
18 {
19  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
20  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
21  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
22  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
23  addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
24  addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
25 
26  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
27 
28  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
29  "set the number of processes to at least 1.",
30  Environment::Instance().getNumberProcesses());
31 }
32 
33 void ZMQTxInputModule::initialize()
34 {
35  StoreObjPtr<RandomGenerator> randomgenerator;
36  randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
37 }
38 
39 void ZMQTxInputModule::event()
40 {
41  try {
42  if (m_firstEvent) {
43  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
44  m_zmqClient.initialize<ZMQ_ROUTER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, true);
45 
46  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
47  m_zmqClient.publish(std::move(multicastHelloMsg));
48 
49  // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
50  m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
51  m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
52  m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
53  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
54 
55  m_firstEvent = false;
56  }
57 
58  if (not m_zmqClient.isOnline()) {
59  return;
60  }
61 
62  int timeout = m_param_maximalWaitingTime;
63  if (not m_nextWorker.empty()) {
64  // if next worker are available do not waste time
65  timeout = 0;
66  }
67 
68  bool terminate = false;
69 
70  const auto multicastAnswer = [this, &terminate](const auto & socket) {
71  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
72  const std::string& data = multicastMessage->getData();
73 
74  if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
75  m_workers.push_back(std::stoi(data));
76  B2DEBUG(10, "received c_helloMessage from " << data << "... replying");
77  auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
78  m_zmqClient.send(std::move(replyHelloMessage));
79  return true;
80  } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
81  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
82  m_procEvtBackupList.removeEvent(eventMetaData);
83  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
84  return true;
85  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
86  const int workerID = std::atoi(data.c_str());
87  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
88 
89  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
90  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
91  return true;
92  } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
93  B2DEBUG(10, "Having received a stop message. I can not do much here, but just hope for the best.");
94  terminate = true;
95  return false;
96  }
97 
98  return true;
99  };
100 
101  const auto socketAnswer = [this](const auto & socket) {
102  const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
103  if (message->isMessage(EMessageTypes::c_readyMessage)) {
104  B2DEBUG(10, "got worker ready message");
105  m_nextWorker.push_back(std::stoi(message->getIdentity()));
106  return false;
107  }
108 
109  B2ERROR("Invalid message from worker");
110  return true;
111  };
112 
113  m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
114  // false positive due to lambda capture ...
115  if (terminate) {
116  m_zmqClient.terminate();
117  return;
118  }
119 
120  B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
121 
122  const unsigned int nextWorker = m_nextWorker.front();
123  m_nextWorker.pop_front();
124  B2DEBUG(10, "Next worker is " << nextWorker);
125 
126  auto eventMessage = m_streamer.stream();
127 
128  if (eventMessage->size() > 0) {
129  auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
130  m_zmqClient.send(std::move(message));
131  B2DEBUG(10, "Having send message to worker " << nextWorker);
132 
133  if (m_param_useEventBackup) {
134  m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
135  B2DEBUG(10, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
136  checkWorkerProcTimeout();
137  }
138  B2DEBUG(10, "finished event");
139  }
140  } catch (zmq::error_t& ex) {
141  if (ex.num() != EINTR) {
142  B2ERROR("There was an error during the Tx input event: " << ex.what());
143  }
144  } catch (exception& ex) {
145  B2ERROR(ex.what());
146 
147  }
148 }
149 
150 //TODO: wait for confirmation before deleting when sending backup messages to output
151 void ZMQTxInputModule::checkWorkerProcTimeout()
152 {
153  if (not m_param_useEventBackup or m_param_workerProcessTimeout == 0) {
154  return;
155  }
156 
157  const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
158  int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
159  if (workerID > -1) {
160  B2WARNING("Worker process timeout, workerID: " << workerID);
161  auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
162  m_zmqClient.publish(std::move(deathMessage));
163 
164  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
165  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
166  }
167 }
168 
169 void ZMQTxInputModule::terminate()
170 {
171 
172  if (not m_zmqClient.isOnline()) {
173  return;
174  }
175 
176  for (unsigned int workerID : m_workers) {
177  std::string workerIDString = std::to_string(workerID);
178  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
179  m_zmqClient.send(std::move(message));
180  }
181 
182  const auto multicastAnswer = [this](const auto & socket) {
183  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
184  const std::string& data = multicastMessage->getData();
185 
186  if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
187  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
188  m_procEvtBackupList.removeEvent(eventMetaData);
189  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
190  return true;
191  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
192  const int workerID = std::atoi(data.c_str());
193 
194  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
195  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
196  return true;
197  } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
198  // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
199  B2DEBUG(10, "received c_helloMessage from " << data << "... replying with end message");
200  auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
201  m_zmqClient.send(std::move(message));
202  return true;
203  }
204  return true;
205  };
206 
207  while (m_param_useEventBackup and m_procEvtBackupList.size() > 0) {
208  checkWorkerProcTimeout();
209  m_zmqClient.pollMulticast(0, multicastAnswer);
210  std::this_thread::sleep_for(std::chrono::milliseconds(100));
211  }
212 
213  if (m_param_useEventBackup) {
214  // this message is especially for the output, all events reached the output
215  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
216  m_zmqClient.publish(std::move(message));
217  }
218 
219  m_zmqClient.terminate();
220 }
Belle2::ZMQTxInputModule
Module connecting the input path with the worker path on the input side.
Definition: ZMQTxInputModule.h:39
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33