Belle II Software development
ZMQTxInputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11#include <framework/pcore/EvtMessage.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/datastore/StoreObjPtr.h>
15#include <thread>
16#include <chrono>
17#include <algorithm>
18
19using namespace std;
20using namespace Belle2;
21
22REG_MODULE(ZMQTxInput);
23
25{
26 addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27 addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28 addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29 addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30 addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31 addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32
34
35 B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
37 Environment::Instance().getNumberProcesses());
38}
39
45
47{
48 try {
49 if (m_firstEvent) {
50 B2INFO("ZMQTxInputModule :: First Event here");
51
54
55 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
56 m_zmqClient.publish(std::move(multicastHelloMsg));
57
58 // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
59 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
60 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
61 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
62 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
63
64 m_firstEvent = false;
65 }
66
67 if (not m_zmqClient.isOnline()) {
68 return;
69 }
70
71 int timeout = (int)Environment::Instance().getZMQMaximalWaitingTime();
72 if (not m_nextWorker.empty()) {
73 // if next worker are available do not waste time
74 timeout = 0;
75 }
76
77 bool terminate = false;
78
79 const auto multicastAnswer = [this, &terminate](const auto & socket) {
80 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
81 const std::string& data = multicastMessage->getData();
82
83 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
84 m_workers.push_back(std::stoi(data));
85 B2DEBUG(30, "received c_helloMessage from " << data << "... replying");
86 auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
87 m_zmqClient.send(std::move(replyHelloMessage));
88 return true;
89 } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
90 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
91 m_procEvtBackupList.removeEvent(eventMetaData);
92 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
93 return true;
94 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
95 const int workerID = std::atoi(data.c_str());
96 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
97 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
98 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
99 return true;
100 } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
101 B2DEBUG(30, "Having received a stop message. I can not do much here, but just hope for the best.");
102 terminate = true;
103 return false;
104 }
105
106 return true;
107 };
108
109 const auto socketAnswer = [this](const auto & socket) {
110 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
111 if (message->isMessage(EMessageTypes::c_readyMessage)) {
112 B2DEBUG(30, "got worker ready message");
113 m_nextWorker.push_back(std::stoi(message->getIdentity()));
114 return false;
115 }
116
117 B2ERROR("Invalid message from worker");
118 return true;
119 };
120
121
122 // Special treatment for "first event" generated by HLTZMQ2Ds module
123
124 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun())) {
125 B2INFO("ZMQTxInput : special first event processing");
126 // Stream End Run record
127 auto eventMessage = m_streamer.stream();
128
129 // Wait for all workers registered in data socket list
130 unsigned int numproc = Environment::Instance().getNumberProcesses();
131 unsigned int numbuf = Environment::Instance().getZMQEventBufferSize();
132
133 while (m_nextWorker.size() < numproc * numbuf) {
134 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
135 // false positive due to lambda capture ...
136 if (terminate) {
137 m_zmqClient.terminate();
138 return;
139 }
140 }
141
142 for (deque<unsigned int>::iterator it = m_nextWorker.begin(); it != m_nextWorker.end(); ++it) {
143 unsigned int workerID = *it;
144 m_nextWorker.pop_front();
145 std::string workerIDString = std::to_string(workerID);
146 B2INFO("ZMQTxInput :: sending first event to worker : " << LogVar("worker", workerIDString));
147 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
148 m_zmqClient.send(std::move(message));
149 // Check the same ID in the forward
150 while (true) {
151 if ((it + 1) == m_nextWorker.end()) break;
152 if (*(it + 1) == workerID) {
153 ++it;
154 } else
155 break;
156 }
157 }
158 B2INFO("ZMQTxInput : Special first event sent to all workers");
159 return;
160 }
161
162 // Normal event processing
163 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
164 // false positive due to lambda capture ...
165 if (terminate) {
166 m_zmqClient.terminate();
167 return;
168 }
169
170 B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
171
172 const unsigned int nextWorker = m_nextWorker.front();
173 m_nextWorker.pop_front();
174 B2DEBUG(30, "Next worker is " << nextWorker);
175
176 auto eventMessage = m_streamer.stream();
177
178 if (eventMessage->size() > 0) {
179 auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
180 m_zmqClient.send(std::move(message));
181 B2DEBUG(30, "Having send message to worker " << nextWorker);
182
184 m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
185 B2DEBUG(30, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
187 }
188 B2DEBUG(30, "finished event");
189 }
190 } catch (zmq::error_t& ex) {
191 if (ex.num() != EINTR) {
192 B2ERROR("There was an error during the Tx input event: " << ex.what());
193 }
194 } catch (exception& ex) {
195 B2ERROR(ex.what());
196
197 }
198}
199
200// End Run
202{
203
204 B2DEBUG(30, "ZMQTxInput:: EndRun detected. isEndOfRun = " << m_eventMetaData->isEndOfRun() << " RunNo = " <<
205 m_eventMetaData->getRun());
206 if (m_eventMetaData->isEndOfRun() != 1) return;
207
208 // Stream End Run record
209 auto eventMessage = m_streamer.stream();
210
211 // bool terminate = false;
212
213 // All sockets are ready. Send end_run record to all workers
214 for (unsigned int workerID : m_workers) {
215 if (workerID == (unsigned int)getpid()) continue; // skip input process
216 std::string workerIDString = std::to_string(workerID);
217 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
218 m_zmqClient.send(std::move(message));
219 }
220 B2INFO("ZMQTxInput : End Run sent to all workers");
221}
222
223//TODO: wait for confirmation before deleting when sending backup messages to output
225{
227 return;
228 }
229
230 const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
231 int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
232 if (workerID > -1) {
233 B2WARNING("Worker process timeout, workerID: " << workerID);
234 auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
235 m_zmqClient.publish(std::move(deathMessage));
236
237 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
238 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
239 }
240}
241
243{
244
245 if (not m_zmqClient.isOnline()) {
246 return;
247 }
248
249 for (unsigned int workerID : m_workers) {
250 std::string workerIDString = std::to_string(workerID);
251 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
252 m_zmqClient.send(std::move(message));
253 }
254
255 const auto multicastAnswer = [this](const auto & socket) {
256 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
257 const std::string& data = multicastMessage->getData();
258
259 if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
260 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
261 m_procEvtBackupList.removeEvent(eventMetaData);
262 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
263 return true;
264 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
265 const int workerID = std::atoi(data.c_str());
266
267 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
268 m_procEvtBackupList.sendWorkerBackupEvents(workerID, m_zmqClient);
269 return true;
270 } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
271 // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
272 B2DEBUG(30, "received c_helloMessage from " << data << "... replying with end message");
273 auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
274 m_zmqClient.send(std::move(message));
275 return true;
276 }
277 return true;
278 };
279
280 while (m_param_useEventBackup and m_procEvtBackupList.size() > 0) {
282 m_zmqClient.pollMulticast(0, multicastAnswer);
283 std::this_thread::sleep_for(std::chrono::milliseconds(100));
284 }
285
287 // this message is especially for the output, all events reached the output
288 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
289 m_zmqClient.publish(std::move(message));
290 }
291
292 m_zmqClient.terminate();
293}
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition DataStore.h:71
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
static EventMetaData deserialize(std::string stream)
Deserialize the event data from a string.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition Module.cc:208
Module()
Constructor.
Definition Module.cc:30
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition Module.h:80
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition StoreObjPtr.h:96
static std::unique_ptr< AMessage > fromSocket(const std::unique_ptr< zmq::socket_t > &socket)
Create a message of the given type by receiving a message from the socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
ZMQTxInputModule()
Constructor setting the module parameters.
void endRun() override
EndRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
Class to store variables with their name which were sent to the logging service.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition Module.h:649
Abstract base class for different kinds of events.
STL namespace.