Belle II Software  release-08-01-10
ZMQTxInputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
15 #include <thread>
16 #include <chrono>
17 #include <algorithm>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 REG_MODULE(ZMQTxInput);
23 
24 ZMQTxInputModule::ZMQTxInputModule() : Module()
25 {
26  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30  addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31  addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32 
33  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
34 
35  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36  "set the number of processes to at least 1.",
37  Environment::Instance().getNumberProcesses());
38 }
39 
41 {
42  StoreObjPtr<RandomGenerator> randomgenerator;
44 }
45 
47 {
48  try {
49  if (m_firstEvent) {
50  B2INFO("ZMQTxInputModule :: First Event here");
51 
54 
55  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
56  m_zmqClient.publish(std::move(multicastHelloMsg));
57  // B2INFO ( "ZMQTxInput : multicast Hello message sent. PID = " << getpid() );
58 
59  // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
60  m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
61  m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
62  m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
63  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
64 
65  m_firstEvent = false;
66  }
67 
68  if (not m_zmqClient.isOnline()) {
69  return;
70  }
71 
72  // int timeout = m_param_maximalWaitingTime;
73  // int timeout = 7200 * 1000;
74  int timeout = (int)Environment::Instance().getZMQMaximalWaitingTime();
75  if (not m_nextWorker.empty()) {
76  // if next worker are available do not waste time
77  timeout = 0;
78  }
79 
80  bool terminate = false;
81 
82  const auto multicastAnswer = [this, &terminate](const auto & socket) {
83  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
84  const std::string& data = multicastMessage->getData();
85 
86  if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
87  m_workers.push_back(std::stoi(data));
88  B2DEBUG(30, "received c_helloMessage from " << data << "... replying");
89  // B2INFO ( "ZMQTxInput : received c_helloMessage from " << data << "... replying");
90  auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
91  m_zmqClient.send(std::move(replyHelloMessage));
92  return true;
93  } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
94  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
95  m_procEvtBackupList.removeEvent(eventMetaData);
96  B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
97  return true;
98  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
99  const int workerID = std::atoi(data.c_str());
100  B2DEBUG(30, "received worker delete message, workerID: " << workerID);
102  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
103  return true;
104  } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105  B2DEBUG(30, "Having received a stop message. I can not do much here, but just hope for the best.");
106  terminate = true;
107  return false;
108  }
109 
110  return true;
111  };
112 
113  const auto socketAnswer = [this](const auto & socket) {
114  const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
115  if (message->isMessage(EMessageTypes::c_readyMessage)) {
116  B2DEBUG(30, "got worker ready message");
117  m_nextWorker.push_back(std::stoi(message->getIdentity()));
118  return false;
119  }
120 
121  B2ERROR("Invalid message from worker");
122  return true;
123  };
124 
125 
126  // Special treatment for "first event" generated by HLTZMQ2Ds module
127 
128  // if ( m_eventMetaData->getExperiment() == 42 && m_eventMetaData->getRun() == 8 ) { // Special first event
129  if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun())) {
130  B2INFO("ZMQTxInput : special first event processing");
131  // Stream End Run record
132  auto eventMessage = m_streamer.stream();
133 
134  // Wait for all workers registered in data socket list
135  unsigned int numproc = Environment::Instance().getNumberProcesses();
136  unsigned int numbuf = Environment::Instance().getZMQEventBufferSize();
137 
138  while (m_nextWorker.size() < numproc * numbuf) {
139  m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
140  // B2INFO ( "ZMQTxInput : numproc = " << numproc << " <-> count = " << m_nextWorker.size() );
141  // false positive due to lambda capture ...
142  if (terminate) {
144  return;
145  }
146  }
147 
148  // unsigned int workerID = -1;
149  for (deque<unsigned int>::iterator it = m_nextWorker.begin(); it != m_nextWorker.end(); ++it) {
150  unsigned int workerID = *it;
151  m_nextWorker.pop_front();
152  std::string workerIDString = std::to_string(workerID);
153  B2INFO("ZMQTxInput :: sending first event to worker : " << LogVar("worker", workerIDString));
154  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
155  m_zmqClient.send(std::move(message));
156  // Check the same ID in the forward
157  while (true) {
158  if ((it + 1) == m_nextWorker.end()) break;
159  if (*(it + 1) == workerID) {
160  // B2INFO ( "Skipping " << workerID );
161  ++it;
162  } else
163  break;
164  }
165  }
166  B2INFO("ZMQTxInput : Special first event sent to all workers");
167  return;
168  }
169 
170  // Normal event processing
171 
172  m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
173  // false positive due to lambda capture ...
174  if (terminate) {
176  return;
177  }
178 
179  B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
180 
181 
182  const unsigned int nextWorker = m_nextWorker.front();
183  m_nextWorker.pop_front();
184  B2DEBUG(30, "Next worker is " << nextWorker);
185 
186  auto eventMessage = m_streamer.stream();
187 
188  if (eventMessage->size() > 0) {
189  auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
190  m_zmqClient.send(std::move(message));
191  B2DEBUG(30, "Having send message to worker " << nextWorker);
192 
194  m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
195  B2DEBUG(30, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
197  }
198  B2DEBUG(30, "finished event");
199  }
200  } catch (zmq::error_t& ex) {
201  if (ex.num() != EINTR) {
202  B2ERROR("There was an error during the Tx input event: " << ex.what());
203  }
204  } catch (exception& ex) {
205  B2ERROR(ex.what());
206 
207  }
208 }
209 
210 // End Run
212 {
213 
214  B2DEBUG(30, "ZMQTxInput:: EndRun detected. isEndOfRun = " << m_eventMetaData->isEndOfRun() << " RunNo = " <<
215  m_eventMetaData->getRun());
216  if (m_eventMetaData->isEndOfRun() != 1) return;
217 
218  // Stream End Run record
219  auto eventMessage = m_streamer.stream();
220 
221  // bool terminate = false;
222 
223  // All sockets are ready. Send end_run record to all workers
224  for (unsigned int workerID : m_workers) {
225  if (workerID == (unsigned int)getpid()) continue; // skip input process
226  std::string workerIDString = std::to_string(workerID);
227  // B2INFO ( "ZMQTxInput :: sending EndRun to " << workerIDString );
228  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
229  m_zmqClient.send(std::move(message));
230  }
231  B2INFO("ZMQTxInput : End Run sent to all workers");
232 }
233 
234 //TODO: wait for confirmation before deleting when sending backup messages to output
236 {
238  return;
239  }
240 
241  const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
242  int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
243  if (workerID > -1) {
244  B2WARNING("Worker process timeout, workerID: " << workerID);
245  auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
246  m_zmqClient.publish(std::move(deathMessage));
247 
249  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
250  }
251 }
252 
254 {
255 
256  if (not m_zmqClient.isOnline()) {
257  return;
258  }
259 
260  for (unsigned int workerID : m_workers) {
261  std::string workerIDString = std::to_string(workerID);
262  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
263  m_zmqClient.send(std::move(message));
264  }
265 
266  const auto multicastAnswer = [this](const auto & socket) {
267  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
268  const std::string& data = multicastMessage->getData();
269 
270  if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
271  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
272  m_procEvtBackupList.removeEvent(eventMetaData);
273  B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
274  return true;
275  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
276  const int workerID = std::atoi(data.c_str());
277 
278  B2DEBUG(30, "received worker delete message, workerID: " << workerID);
280  return true;
281  } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
282  // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
283  B2DEBUG(30, "received c_helloMessage from " << data << "... replying with end message");
284  auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
285  m_zmqClient.send(std::move(message));
286  return true;
287  }
288  return true;
289  };
290 
293  m_zmqClient.pollMulticast(0, multicastAnswer);
294  std::this_thread::sleep_for(std::chrono::milliseconds(100));
295  }
296 
298  // this message is especially for the output, all events reached the output
299  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
300  m_zmqClient.publish(std::move(message));
301  }
302 
304 }
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
Definition: Environment.h:264
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:145
unsigned int getZMQMaximalWaitingTime() const
Maximal waiting time of any ZMQ module for any communication in ms.
Definition: Environment.h:252
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
static EventMetaData deserialize(std::string stream)
Deserialize the event data from a string.
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
void endRun() override
BeginRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
Class to store variables with their name which were sent to the logging service.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.