Belle II Software  light-2212-foldex
ZMQTxInputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
15 #include <thread>
16 #include <chrono>
17 #include <algorithm>
18 
19 using namespace std;
20 using namespace Belle2;
21 
22 REG_MODULE(ZMQTxInput);
23 
24 ZMQTxInputModule::ZMQTxInputModule() : Module()
25 {
26  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30  addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31  addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32 
33  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
34 
35  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36  "set the number of processes to at least 1.",
37  Environment::Instance().getNumberProcesses());
38 }
39 
41 {
42  StoreObjPtr<RandomGenerator> randomgenerator;
44 }
45 
47 {
48  try {
49  if (m_firstEvent) {
52 
53  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
54  m_zmqClient.publish(std::move(multicastHelloMsg));
55 
56  // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
57  m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
58  m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
59  m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
60  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
61 
62  m_firstEvent = false;
63  }
64 
65  if (not m_zmqClient.isOnline()) {
66  return;
67  }
68 
69  int timeout = m_param_maximalWaitingTime;
70  if (not m_nextWorker.empty()) {
71  // if next worker are available do not waste time
72  timeout = 0;
73  }
74 
75  bool terminate = false;
76 
77  const auto multicastAnswer = [this, &terminate](const auto & socket) {
78  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
79  const std::string& data = multicastMessage->getData();
80 
81  if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
82  m_workers.push_back(std::stoi(data));
83  B2DEBUG(10, "received c_helloMessage from " << data << "... replying");
84  auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
85  m_zmqClient.send(std::move(replyHelloMessage));
86  return true;
87  } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
88  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
89  m_procEvtBackupList.removeEvent(eventMetaData);
90  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
91  return true;
92  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
93  const int workerID = std::atoi(data.c_str());
94  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
95 
97  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
98  return true;
99  } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
100  B2DEBUG(10, "Having received a stop message. I can not do much here, but just hope for the best.");
101  terminate = true;
102  return false;
103  }
104 
105  return true;
106  };
107 
108  const auto socketAnswer = [this](const auto & socket) {
109  const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
110  if (message->isMessage(EMessageTypes::c_readyMessage)) {
111  B2DEBUG(10, "got worker ready message");
112  m_nextWorker.push_back(std::stoi(message->getIdentity()));
113  return false;
114  }
115 
116  B2ERROR("Invalid message from worker");
117  return true;
118  };
119 
120  m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
121  // false positive due to lambda capture ...
122  if (terminate) {
124  return;
125  }
126 
127  B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
128 
129  const unsigned int nextWorker = m_nextWorker.front();
130  m_nextWorker.pop_front();
131  B2DEBUG(10, "Next worker is " << nextWorker);
132 
133  auto eventMessage = m_streamer.stream();
134 
135  if (eventMessage->size() > 0) {
136  auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
137  m_zmqClient.send(std::move(message));
138  B2DEBUG(10, "Having send message to worker " << nextWorker);
139 
141  m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
142  B2DEBUG(10, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
144  }
145  B2DEBUG(10, "finished event");
146  }
147  } catch (zmq::error_t& ex) {
148  if (ex.num() != EINTR) {
149  B2ERROR("There was an error during the Tx input event: " << ex.what());
150  }
151  } catch (exception& ex) {
152  B2ERROR(ex.what());
153 
154  }
155 }
156 
157 //TODO: wait for confirmation before deleting when sending backup messages to output
159 {
161  return;
162  }
163 
164  const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
165  int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
166  if (workerID > -1) {
167  B2WARNING("Worker process timeout, workerID: " << workerID);
168  auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
169  m_zmqClient.publish(std::move(deathMessage));
170 
172  m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
173  }
174 }
175 
177 {
178 
179  if (not m_zmqClient.isOnline()) {
180  return;
181  }
182 
183  for (unsigned int workerID : m_workers) {
184  std::string workerIDString = std::to_string(workerID);
185  auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
186  m_zmqClient.send(std::move(message));
187  }
188 
189  const auto multicastAnswer = [this](const auto & socket) {
190  const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
191  const std::string& data = multicastMessage->getData();
192 
193  if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
194  const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
195  m_procEvtBackupList.removeEvent(eventMetaData);
196  B2DEBUG(10, "removed event backup.. list size: " << m_procEvtBackupList.size());
197  return true;
198  } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
199  const int workerID = std::atoi(data.c_str());
200 
201  B2DEBUG(10, "received worker delete message, workerID: " << workerID);
203  return true;
204  } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
205  // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
206  B2DEBUG(10, "received c_helloMessage from " << data << "... replying with end message");
207  auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
208  m_zmqClient.send(std::move(message));
209  return true;
210  }
211  return true;
212  };
213 
216  m_zmqClient.pollMulticast(0, multicastAnswer);
217  std::this_thread::sleep_for(std::chrono::milliseconds(100));
218  }
219 
221  // this message is especially for the output, all events reached the output
222  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
223  m_zmqClient.publish(std::move(message));
224  }
225 
227 }
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
static EventMetaData deserialize(std::string stream)
Deserialize the event data from a string.
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:58
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:52
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:42
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:112
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:161
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
Definition: ClusterUtils.h:23