Belle II Software  release-06-00-14
ZMQTxInputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
15 #include <thread>
16 #include <chrono>
17 #include <algorithm>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 REG_MODULE(ZMQTxInput)
23 
25 {
26  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30  addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31  addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32 
33  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
34 
35  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36  "set the number of processes to at least 1.",
37  Environment::Instance().getNumberProcesses());
38 }
39 
40 void ZMQTxInputModule::initialize()
41 {
42  StoreObjPtr<RandomGenerator> randomgenerator;
43  randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
44 }
45 
46 void ZMQTxInputModule::event()
47 {
48  try {
49  if (m_firstEvent) {
50  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
51  m_zmqClient.initialize<ZMQ_ROUTER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, true);
52 
53  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
54  m_zmqClient.publish(std::move(multicastHelloMsg));
55 
56  // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
57  m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
58  m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
59  m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
60  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
61 
62  m_firstEvent = false;
63  }
64 
65  if (not m_zmqClient.isOnline()) {
66  return;
67  }
68 
69  int timeout = m_param_maximalWaitingTime;
70  if (not m_nextWorker.empty()) {
71  // if next worker are available do not waste time
72  timeout = 0;
73  }
74 
75  bool terminate = false;
76 
77  const auto multicastAnswer = [this, &terminate](const auto & socket) {
78  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
79  const std::string& data = multicastMessage->getData();
80 
81  if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
82  m_workers.push_back(std::stoi(data));
83  B2DEBUG(10, "received c_helloMessage from " << data << "... replying");
84  auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
85  m_zmqClient.send(std::move(replyHelloMessage));
86  return true;
87  } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
88  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
89  m_procEvtBackupList.removeEvent(eventMetaData);
90  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
91  return true;
92  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
93  const int workerID = std::atoi(data.c_str());
94  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
95 
96  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
97  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
98  return true;
99  } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
100  B2DEBUG(10, "Having received a stop message. I can not do much here, but just hope for the best.");
101  terminate = true;
102  return false;
103  }
104 
105  return true;
106  };
107 
108  const auto socketAnswer = [this](const auto & socket) {
109  const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
110  if (message->isMessage(EMessageTypes::c_readyMessage)) {
111  B2DEBUG(10, "got worker ready message");
112  m_nextWorker.push_back(std::stoi(message->getIdentity()));
113  return false;
114  }
115 
116  B2ERROR("Invalid message from worker");
117  return true;
118  };
119 
120  m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
121  // false positive due to lambda capture ...
122  if (terminate) {
123  m_zmqClient.terminate();
124  return;
125  }
126 
127  B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
128 
129  const unsigned int nextWorker = m_nextWorker.front();
130  m_nextWorker.pop_front();
131  B2DEBUG(10, "Next worker is " << nextWorker);
132 
133  auto eventMessage = m_streamer.stream();
134 
135  if (eventMessage->size() > 0) {
136  auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
137  m_zmqClient.send(std::move(message));
138  B2DEBUG(10, "Having send message to worker " << nextWorker);
139 
140  if (m_param_useEventBackup) {
141  m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
142  B2DEBUG(10, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
143  checkWorkerProcTimeout();
144  }
145  B2DEBUG(10, "finished event");
146  }
147  } catch (zmq::error_t& ex) {
148  if (ex.num() != EINTR) {
149  B2ERROR("There was an error during the Tx input event: " << ex.what());
150  }
151  } catch (exception& ex) {
152  B2ERROR(ex.what());
153 
154  }
155 }
156 
157 //TODO: wait for confirmation before deleting when sending backup messages to output
158 void ZMQTxInputModule::checkWorkerProcTimeout()
159 {
160  if (not m_param_useEventBackup or m_param_workerProcessTimeout == 0) {
161  return;
162  }
163 
164  const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
165  int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
166  if (workerID > -1) {
167  B2WARNING("Worker process timeout, workerID: " << workerID);
168  auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
169  m_zmqClient.publish(std::move(deathMessage));
170 
171  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
172  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
173  }
174 }
175 
176 void ZMQTxInputModule::terminate()
177 {
178 
179  if (not m_zmqClient.isOnline()) {
180  return;
181  }
182 
183  for (unsigned int workerID : m_workers) {
184  std::string workerIDString = std::to_string(workerID);
185  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
186  m_zmqClient.send(std::move(message));
187  }
188 
189  const auto multicastAnswer = [this](const auto & socket) {
190  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
191  const std::string& data = multicastMessage->getData();
192 
193  if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
194  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
195  m_procEvtBackupList.removeEvent(eventMetaData);
196  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
197  return true;
198  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
199  const int workerID = std::atoi(data.c_str());
200 
201  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
202  m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
203  return true;
204  } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
205  // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
206  B2DEBUG(10, "received c_helloMessage from " << data << "... replying with end message");
207  auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
208  m_zmqClient.send(std::move(message));
209  return true;
210  }
211  return true;
212  };
213 
214  while (m_param_useEventBackup and m_procEvtBackupList.size() > 0) {
215  checkWorkerProcTimeout();
216  m_zmqClient.pollMulticast(0, multicastAnswer);
217  std::this_thread::sleep_for(std::chrono::milliseconds(100));
218  }
219 
220  if (m_param_useEventBackup) {
221  // this message is especially for the output, all events reached the output
222  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
223  m_zmqClient.publish(std::move(message));
224  }
225 
226  m_zmqClient.terminate();
227 }
Base class for Modules.
Definition: Module.h:72
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
Module connecting the input path with the worker path on the input side.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.