Belle II Software development
ZMQTxInputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11#include <framework/pcore/EvtMessage.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/datastore/StoreObjPtr.h>
15#include <thread>
16#include <chrono>
17#include <algorithm>
18
19using namespace std;
20using namespace Belle2;
21
22REG_MODULE(ZMQTxInput);
23
25{
26 addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27 addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28 addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29 addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30 addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31 addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32
34
35 B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
37 Environment::Instance().getNumberProcesses());
38}
39
41{
42 StoreObjPtr<RandomGenerator> randomgenerator;
44}
45
47{
48 try {
49 if (m_firstEvent) {
50 B2INFO("ZMQTxInputModule :: First Event here");
51
54
55 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
56 m_zmqClient.publish(std::move(multicastHelloMsg));
57
58 // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
59 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
60 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
61 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
62 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
63
64 m_firstEvent = false;
65 }
66
67 if (not m_zmqClient.isOnline()) {
68 return;
69 }
70
72 if (not m_nextWorker.empty()) {
73 // if next worker are available do not waste time
74 timeout = 0;
75 }
76
77 bool terminate = false;
78
79 const auto multicastAnswer = [this, &terminate](const auto & socket) {
80 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
81 const std::string& data = multicastMessage->getData();
82
83 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
84 m_workers.push_back(std::stoi(data));
85 B2DEBUG(30, "received c_helloMessage from " << data << "... replying");
86 auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
87 m_zmqClient.send(std::move(replyHelloMessage));
88 return true;
89 } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
90 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
91 m_procEvtBackupList.removeEvent(eventMetaData);
92 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
93 return true;
94 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
95 const int workerID = std::atoi(data.c_str());
96 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
98 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
99 return true;
100 } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
101 B2DEBUG(30, "Having received a stop message. I can not do much here, but just hope for the best.");
102 terminate = true;
103 return false;
104 }
105
106 return true;
107 };
108
109 const auto socketAnswer = [this](const auto & socket) {
110 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
111 if (message->isMessage(EMessageTypes::c_readyMessage)) {
112 B2DEBUG(30, "got worker ready message");
113 m_nextWorker.push_back(std::stoi(message->getIdentity()));
114 return false;
115 }
116
117 B2ERROR("Invalid message from worker");
118 return true;
119 };
120
121
122 // Special treatment for "first event" generated by HLTZMQ2Ds module
123
124 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun())) {
125 B2INFO("ZMQTxInput : special first event processing");
126 // Stream End Run record
127 auto eventMessage = m_streamer.stream();
128
129 // Wait for all workers registered in data socket list
130 unsigned int numproc = Environment::Instance().getNumberProcesses();
131 unsigned int numbuf = Environment::Instance().getZMQEventBufferSize();
132
133 while (m_nextWorker.size() < numproc * numbuf) {
134 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
135 // false positive due to lambda capture ...
136 if (terminate) {
138 return;
139 }
140 }
141
142 for (deque<unsigned int>::iterator it = m_nextWorker.begin(); it != m_nextWorker.end(); ++it) {
143 unsigned int workerID = *it;
144 m_nextWorker.pop_front();
145 std::string workerIDString = std::to_string(workerID);
146 B2INFO("ZMQTxInput :: sending first event to worker : " << LogVar("worker", workerIDString));
147 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
148 m_zmqClient.send(std::move(message));
149 // Check the same ID in the forward
150 while (true) {
151 if ((it + 1) == m_nextWorker.end()) break;
152 if (*(it + 1) == workerID) {
153 ++it;
154 } else
155 break;
156 }
157 }
158 B2INFO("ZMQTxInput : Special first event sent to all workers");
159 return;
160 }
161
162 // Normal event processing
163 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
164 // false positive due to lambda capture ...
165 if (terminate) {
167 return;
168 }
169
170 B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
171
172 const unsigned int nextWorker = m_nextWorker.front();
173 m_nextWorker.pop_front();
174 B2DEBUG(30, "Next worker is " << nextWorker);
175
176 auto eventMessage = m_streamer.stream();
177
178 if (eventMessage->size() > 0) {
179 auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
180 m_zmqClient.send(std::move(message));
181 B2DEBUG(30, "Having send message to worker " << nextWorker);
182
184 m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
185 B2DEBUG(30, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
187 }
188 B2DEBUG(30, "finished event");
189 }
190 } catch (zmq::error_t& ex) {
191 if (ex.num() != EINTR) {
192 B2ERROR("There was an error during the Tx input event: " << ex.what());
193 }
194 } catch (exception& ex) {
195 B2ERROR(ex.what());
196
197 }
198}
199
200// End Run
202{
203
204 B2DEBUG(30, "ZMQTxInput:: EndRun detected. isEndOfRun = " << m_eventMetaData->isEndOfRun() << " RunNo = " <<
205 m_eventMetaData->getRun());
206 if (m_eventMetaData->isEndOfRun() != 1) return;
207
208 // Stream End Run record
209 auto eventMessage = m_streamer.stream();
210
211 // bool terminate = false;
212
213 // All sockets are ready. Send end_run record to all workers
214 for (unsigned int workerID : m_workers) {
215 if (workerID == (unsigned int)getpid()) continue; // skip input process
216 std::string workerIDString = std::to_string(workerID);
217 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
218 m_zmqClient.send(std::move(message));
219 }
220 B2INFO("ZMQTxInput : End Run sent to all workers");
221}
222
223//TODO: wait for confirmation before deleting when sending backup messages to output
225{
227 return;
228 }
229
230 const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
231 int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
232 if (workerID > -1) {
233 B2WARNING("Worker process timeout, workerID: " << workerID);
234 auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
235 m_zmqClient.publish(std::move(deathMessage));
236
238 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
239 }
240}
241
243{
244
245 if (not m_zmqClient.isOnline()) {
246 return;
247 }
248
249 for (unsigned int workerID : m_workers) {
250 std::string workerIDString = std::to_string(workerID);
251 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
252 m_zmqClient.send(std::move(message));
253 }
254
255 const auto multicastAnswer = [this](const auto & socket) {
256 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
257 const std::string& data = multicastMessage->getData();
258
259 if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
260 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
261 m_procEvtBackupList.removeEvent(eventMetaData);
262 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
263 return true;
264 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
265 const int workerID = std::atoi(data.c_str());
266
267 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
269 return true;
270 } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
271 // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
272 B2DEBUG(30, "received c_helloMessage from " << data << "... replying with end message");
273 auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
274 m_zmqClient.send(std::move(message));
275 return true;
276 }
277 return true;
278 };
279
282 m_zmqClient.pollMulticast(0, multicastAnswer);
283 std::this_thread::sleep_for(std::chrono::milliseconds(100));
284 }
285
287 // this message is especially for the output, all events reached the output
288 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
289 m_zmqClient.publish(std::move(message));
290 }
291
293}
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
Definition: Environment.h:285
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:158
unsigned int getZMQMaximalWaitingTime() const
Maximal waiting time of any ZMQ module for any communication in ms.
Definition: Environment.h:273
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
static EventMetaData deserialize(std::string stream)
Deserialize the event data from a string.
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
ZMQTxInputModule()
Constructor setting the module parameters.
void endRun() override
EndRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
Class to store variables with their name which were sent to the logging service.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:559
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:649
Abstract base class for different kinds of events.
STL namespace.