Belle II Software development
ZMQTxInputModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11#include <framework/pcore/EvtMessage.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/datastore/StoreObjPtr.h>
15#include <thread>
16#include <chrono>
17#include <algorithm>
18
19using namespace std;
20using namespace Belle2;
21
22REG_MODULE(ZMQTxInput);
23
25{
26 addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
27 addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
28 addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
29 addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30 addParam("workerProcessTimeout", m_param_workerProcessTimeout, "Maximal time a worker is allowed to spent per event");
31 addParam("useEventBackup", m_param_useEventBackup, "Turn on the event backup");
32
34
35 B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
37 Environment::Instance().getNumberProcesses());
38}
39
41{
42 StoreObjPtr<RandomGenerator> randomgenerator;
44}
45
47{
48 try {
49 if (m_firstEvent) {
50 B2INFO("ZMQTxInputModule :: First Event here");
51
54
55 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
56 m_zmqClient.publish(std::move(multicastHelloMsg));
57 // B2INFO ( "ZMQTxInput : multicast Hello message sent. PID = " << getpid() );
58
59 // Listen to event confirmations, hello of workers, the messages to delete a worker and the general stop messages
60 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
61 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
62 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
63 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
64
65 m_firstEvent = false;
66 }
67
68 if (not m_zmqClient.isOnline()) {
69 return;
70 }
71
72 // int timeout = m_param_maximalWaitingTime;
73 // int timeout = 7200 * 1000;
75 if (not m_nextWorker.empty()) {
76 // if next worker are available do not waste time
77 timeout = 0;
78 }
79
80 bool terminate = false;
81
82 const auto multicastAnswer = [this, &terminate](const auto & socket) {
83 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
84 const std::string& data = multicastMessage->getData();
85
86 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
87 m_workers.push_back(std::stoi(data));
88 B2DEBUG(30, "received c_helloMessage from " << data << "... replying");
89 // B2INFO ( "ZMQTxInput : received c_helloMessage from " << data << "... replying");
90 auto replyHelloMessage = ZMQMessageFactory::createMessage(data, EMessageTypes::c_helloMessage);
91 m_zmqClient.send(std::move(replyHelloMessage));
92 return true;
93 } else if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
94 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
95 m_procEvtBackupList.removeEvent(eventMetaData);
96 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
97 return true;
98 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
99 const int workerID = std::atoi(data.c_str());
100 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
102 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
103 return true;
104 } else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105 B2DEBUG(30, "Having received a stop message. I can not do much here, but just hope for the best.");
106 terminate = true;
107 return false;
108 }
109
110 return true;
111 };
112
113 const auto socketAnswer = [this](const auto & socket) {
114 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
115 if (message->isMessage(EMessageTypes::c_readyMessage)) {
116 B2DEBUG(30, "got worker ready message");
117 m_nextWorker.push_back(std::stoi(message->getIdentity()));
118 return false;
119 }
120
121 B2ERROR("Invalid message from worker");
122 return true;
123 };
124
125
126 // Special treatment for "first event" generated by HLTZMQ2Ds module
127
128 // if ( m_eventMetaData->getExperiment() == 42 && m_eventMetaData->getRun() == 8 ) { // Special first event
129 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun())) {
130 B2INFO("ZMQTxInput : special first event processing");
131 // Stream End Run record
132 auto eventMessage = m_streamer.stream();
133
134 // Wait for all workers registered in data socket list
135 unsigned int numproc = Environment::Instance().getNumberProcesses();
136 unsigned int numbuf = Environment::Instance().getZMQEventBufferSize();
137
138 while (m_nextWorker.size() < numproc * numbuf) {
139 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
140 // B2INFO ( "ZMQTxInput : numproc = " << numproc << " <-> count = " << m_nextWorker.size() );
141 // false positive due to lambda capture ...
142 if (terminate) {
144 return;
145 }
146 }
147
148 // unsigned int workerID = -1;
149 for (deque<unsigned int>::iterator it = m_nextWorker.begin(); it != m_nextWorker.end(); ++it) {
150 unsigned int workerID = *it;
151 m_nextWorker.pop_front();
152 std::string workerIDString = std::to_string(workerID);
153 B2INFO("ZMQTxInput :: sending first event to worker : " << LogVar("worker", workerIDString));
154 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
155 m_zmqClient.send(std::move(message));
156 // Check the same ID in the forward
157 while (true) {
158 if ((it + 1) == m_nextWorker.end()) break;
159 if (*(it + 1) == workerID) {
160 // B2INFO ( "Skipping " << workerID );
161 ++it;
162 } else
163 break;
164 }
165 }
166 B2INFO("ZMQTxInput : Special first event sent to all workers");
167 return;
168 }
169
170 // Normal event processing
171
172 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
173 // false positive due to lambda capture ...
174 if (terminate) {
176 return;
177 }
178
179 B2ASSERT("Did not receive any ready messaged for quite some time!", not m_nextWorker.empty());
180
181
182 const unsigned int nextWorker = m_nextWorker.front();
183 m_nextWorker.pop_front();
184 B2DEBUG(30, "Next worker is " << nextWorker);
185
186 auto eventMessage = m_streamer.stream();
187
188 if (eventMessage->size() > 0) {
189 auto message = ZMQMessageFactory::createMessage(std::to_string(nextWorker), EMessageTypes::c_eventMessage, eventMessage);
190 m_zmqClient.send(std::move(message));
191 B2DEBUG(30, "Having send message to worker " << nextWorker);
192
194 m_procEvtBackupList.storeEvent(std::move(eventMessage), m_eventMetaData, nextWorker);
195 B2DEBUG(30, "stored event " << m_eventMetaData->getEvent() << " backup.. list size: " << m_procEvtBackupList.size());
197 }
198 B2DEBUG(30, "finished event");
199 }
200 } catch (zmq::error_t& ex) {
201 if (ex.num() != EINTR) {
202 B2ERROR("There was an error during the Tx input event: " << ex.what());
203 }
204 } catch (exception& ex) {
205 B2ERROR(ex.what());
206
207 }
208}
209
210// End Run
212{
213
214 B2DEBUG(30, "ZMQTxInput:: EndRun detected. isEndOfRun = " << m_eventMetaData->isEndOfRun() << " RunNo = " <<
215 m_eventMetaData->getRun());
216 if (m_eventMetaData->isEndOfRun() != 1) return;
217
218 // Stream End Run record
219 auto eventMessage = m_streamer.stream();
220
221 // bool terminate = false;
222
223 // All sockets are ready. Send end_run record to all workers
224 for (unsigned int workerID : m_workers) {
225 if (workerID == (unsigned int)getpid()) continue; // skip input process
226 std::string workerIDString = std::to_string(workerID);
227 // B2INFO ( "ZMQTxInput :: sending EndRun to " << workerIDString );
228 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_eventMessage, eventMessage);
229 m_zmqClient.send(std::move(message));
230 }
231 B2INFO("ZMQTxInput : End Run sent to all workers");
232}
233
234//TODO: wait for confirmation before deleting when sending backup messages to output
236{
238 return;
239 }
240
241 const std::chrono::milliseconds workerProcTimeout(m_param_workerProcessTimeout);
242 int workerID = m_procEvtBackupList.checkForTimeout(workerProcTimeout);
243 if (workerID > -1) {
244 B2WARNING("Worker process timeout, workerID: " << workerID);
245 auto deathMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_killWorkerMessage, std::to_string(workerID));
246 m_zmqClient.publish(std::move(deathMessage));
247
249 m_nextWorker.erase(std::remove(m_nextWorker.begin(), m_nextWorker.end(), workerID), m_nextWorker.end());
250 }
251}
252
254{
255
256 if (not m_zmqClient.isOnline()) {
257 return;
258 }
259
260 for (unsigned int workerID : m_workers) {
261 std::string workerIDString = std::to_string(workerID);
262 auto message = ZMQMessageFactory::createMessage(workerIDString, EMessageTypes::c_lastEventMessage);
263 m_zmqClient.send(std::move(message));
264 }
265
266 const auto multicastAnswer = [this](const auto & socket) {
267 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
268 const std::string& data = multicastMessage->getData();
269
270 if (multicastMessage->isMessage(EMessageTypes::c_confirmMessage) and m_param_useEventBackup) {
271 const auto& eventMetaData = EventMetaDataSerialization::deserialize(data);
272 m_procEvtBackupList.removeEvent(eventMetaData);
273 B2DEBUG(30, "removed event backup.. list size: " << m_procEvtBackupList.size());
274 return true;
275 } else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and m_param_useEventBackup) {
276 const int workerID = std::atoi(data.c_str());
277
278 B2DEBUG(30, "received worker delete message, workerID: " << workerID);
280 return true;
281 } else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
282 // A new worker? Well, he is quite late... nevertheless, lets tell him to end it
283 B2DEBUG(30, "received c_helloMessage from " << data << "... replying with end message");
284 auto message = ZMQMessageFactory::createMessage(data, EMessageTypes::c_lastEventMessage);
285 m_zmqClient.send(std::move(message));
286 return true;
287 }
288 return true;
289 };
290
293 m_zmqClient.pollMulticast(0, multicastAnswer);
294 std::this_thread::sleep_for(std::chrono::milliseconds(100));
295 }
296
298 // this message is especially for the output, all events reached the output
299 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage);
300 m_zmqClient.publish(std::move(message));
301 }
302
304}
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
Definition: Environment.h:284
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
unsigned int getZMQMaximalWaitingTime() const
Maximal waiting time of any ZMQ module for any communication in ms.
Definition: Environment.h:272
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
static EventMetaData deserialize(std::string stream)
Deserialize the event data from a string.
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
bool isOnline() const
Check if the client was initialized and not terminated.
Definition: ZMQClient.h:59
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
std::deque< unsigned int > m_nextWorker
The list of next worker ids.
unsigned int m_param_workerProcessTimeout
Maximal time a worker is allowed to spent in ms.
void initialize() override
Initialize the RandomSeedGenerator.
ZMQClient m_zmqClient
Our ZMQ client.
ProcessedEventsBackupList m_procEvtBackupList
The backup list.
void event() override
Pack the datastore and send it. Also handle ready or hello messages of workers.
void checkWorkerProcTimeout()
Check if a worker has fallen into a timeout and send a kill message if needed.
ZMQTxInputModule()
Constructor setting the moudle paramters.
void endRun() override
BeginRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done. Tell the output to end if all backups are out...
unsigned int m_param_maximalWaitingTime
Maximal time to wait for any message from the workers in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::vector< unsigned int > m_workers
The list of all workers (to say goodbye properly).
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
bool m_param_useEventBackup
Flag to use the event backup or not.
Class to store variables with their name which were sent to the logging service.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
Definition: ZMQClient.h:162
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.
STL namespace.