Belle II Software development
HLTEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/utils/HLTEventProcessor.h>
9
10#include <boost/python.hpp>
11#include <framework/core/InputController.h>
12#include <framework/pcore/ProcHandler.h>
13
14#include <framework/database/DBStore.h>
15#include <framework/core/RandomNumbers.h>
16#include <framework/core/Environment.h>
17#include <framework/core/ModuleManager.h>
18
19#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
20#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
21#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
22
23#include <TROOT.h>
24
25#include <sys/prctl.h>
26#include <sys/wait.h>
27
28#include <chrono>
29#include <thread>
30#include <signal.h>
31#include <zmq.h>
32
33using namespace Belle2;
34using namespace boost::python;
35
36namespace {
38 static int g_signalReceived = 0;
39
40 // For processor unique ID
41 static int g_processNumber = 1;
42
43 static void storeSignal(int signalNumber)
44 {
45 if (signalNumber == SIGINT) {
46 EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
47 }
48
49 // We do not want to remove the first signal
50 if (g_signalReceived == 0) {
51 g_signalReceived = signalNumber;
52 }
53 }
54}
55
56void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
57{
58 for (auto& socket : m_sockets) {
59 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
61 ZMQParent::send(socket, std::move(message));
62
63 if (not waitForConfirmation) {
64 continue;
65 }
66 if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
67 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
68 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
69 } else {
70 B2FATAL("Did not receive a confirmation message! waitForConfirmation is " << waitForConfirmation);
71 }
72 }
73}
74
75HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
76{
77 m_sockets.reserve(outputAddresses.size());
78 for (const auto& address : outputAddresses) {
79 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
80 }
81}
82
83void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers, bool appendProcessNumberToModuleName)
84{
85 using namespace std::chrono_literals;
86
87 m_moduleList = path->buildModulePathList();
88
89 // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
90 B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
91 for (const auto& module : m_moduleList) {
92 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
93 // entire conditional path must also be compatible
94 if (hasParallelFlag and module->hasCondition()) {
95 for (const auto& conditionPath : module->getAllConditionPaths()) {
97 hasParallelFlag = false;
98 }
99 }
100 }
101 B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
102 }
103
104 // Initialize of all modules (including event() of master module)
107
108 // Don't start processing in case of no master module
109 if (not m_master) {
110 B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
111 "You must add the specific HLT module as first module to the path.");
112 }
113
114 // Check if errors appeared. If yes, don't start the event processing.
116 if (numLogError != 0) {
117 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
118 }
119
120 // Start the workers, which call the main loop
121 const int numProcesses = Environment::Instance().getNumberProcesses();
122 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
123
124 installMainSignalHandlers(storeSignal);
125 // Back in the main process: wait for the processes and monitor them
126 int numberOfRestartedWorkers = 0;
127 while (true) {
128 // check if we have received any signal from the user or OS.
129 // Killing of the remaining processes happens after the loop.
130 if (g_signalReceived > 0) {
131 B2WARNING("Received a signal to go down.");
132 break;
133 }
134
135 // Test if we need more workers and if one has died
136 unsigned int presentWorkers;
137 unsigned int neededWorkers;
138
139 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
140 if (neededWorkers > 0) {
141 if (restartFailedWorkers) {
142 runWorkers(path, neededWorkers);
143 numberOfRestartedWorkers += neededWorkers;
144 } else {
145 B2ERROR("A worker failed. Will try to end the process smoothly now.");
146 break;
147 }
148 } else if (presentWorkers == 0) {
149 B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
150 break;
151 }
152
153 if (numberOfRestartedWorkers > numProcesses) {
154 B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
155 "Will terminate the process now!");
156 break;
157 }
158
159 std::this_thread::sleep_for(10ms);
160 }
161
162 if (appendProcessNumberToModuleName) {
163 for (const int& pid : m_processList) {
164 B2INFO(g_processNumber << ": Send SIGINT to " << pid);
165 kill(pid, SIGINT);
166 }
167 for (const int& pid : m_processList) {
168 int count = 0;
169 while (true) {
170 // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
171 if (kill(pid, 0) != 0) {
172 break;
173 }
174 B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
175 std::this_thread::sleep_for(1000ms);
176 if (count % 5 == 1) kill(pid, SIGINT);
177 ++count;
178 }
179 }
180 }
181
183
184 // if we still have/had processes, we should unregister them
185 std::this_thread::sleep_for(500ms);
186
187 for (const int& pid : m_processList) {
188 if (kill(pid, SIGKILL) >= 0) {
189 B2WARNING("Needed to hard kill process " << pid);
190 } else {
191 B2DEBUG(100, "no process " << pid << " found, already gone?");
192 }
193 sendTerminatedMessage(pid, false);
194 }
195 m_processList.clear();
196
197 B2DEBUG(10, "Done here");
198
199 // Normally, we would call terminate here, but not on HLT!
200 // Normally, we would print the error summary here, but not on HLT!
201 if (g_signalReceived == SIGINT) {
202 installSignalHandler(SIGINT, SIG_DFL);
203 raise(SIGINT);
204 }
205}
206
207void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName)
208{
209 for (unsigned int i = 0; i < numProcesses; i++) {
210 if (forkOut()) {
211 // Do only run in forked out worker process:
212 B2DEBUG(10, "Starting a new worker process");
213 // Reset the parent and sockets
214 release();
215
216 // Start the main loop with our signal handling and error catching
217 installMainSignalHandlers(storeSignal);
218 try {
219 if (appendProcessNumberToModuleName) {
220 for (const auto& module : m_moduleList) {
221 module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
222 B2INFO("New worker name is " << module->getName());
223 }
224 }
225 processCore(path);
226 } catch (StoppedBySignalException& e) {
227 // close all open ROOT files, ROOT's exit handler will crash otherwise
228 gROOT->GetListOfFiles()->Delete();
229
230 B2ERROR(e.what());
231 exit(1);
232 } catch (...) {
234 B2ERROR("Exception occured in exp/run/evt: "
235 << m_eventMetaDataPtr->getExperiment() << " / "
236 << m_eventMetaDataPtr->getRun() << " / "
237 << m_eventMetaDataPtr->getEvent());
238 throw;
239 }
240
241 B2DEBUG(10, "Ending a worker process here.");
242 // Ok, we are done here!
243 exit(0);
244 }
245 }
246}
247
249{
250 bool terminationRequested = false;
251 bool firstRound = true;
252
253 // Initialisation is done
255
256 // Set the previous event meta data to something invalid
257 m_previousEventMetaData.setEndOfData();
258
259 while (not terminationRequested) {
260 B2DEBUG(100, "Processing new event");
261
262 // Start the measurement
263 m_processStatisticsPtr->startGlobal();
264
265 // Main call to event() of the modules, and maybe beginRun() and endRun()
266 PathIterator moduleIter(path);
267 terminationRequested = processEvent(moduleIter, firstRound);
268
269 // Delete event related data in DataStore
271
272 // Stop the measurement
274
275 // We are surely not in the first round the next time
276 firstRound = false;
277 }
278
279 // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
280 B2DEBUG(10, "Calling terminate");
281 m_eventMetaDataPtr.create();
283}
284
285bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
286{
287 while (not moduleIter.isDone()) {
288 Module* module = moduleIter.get();
289 B2DEBUG(10, "Starting event of " << module->getName());
290
291 // The actual call of the event function
292 if (module != m_master) {
293 // If this is not the master module it is quite simple: just call the event function
294 callEvent(module);
295
296 // Check for a second master module. Cannot do this if we are in the first round after initialize
297 // (as previous event meta data is not set properly here)
298 if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
300 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
301 << module->getName());
302 }
303 } else {
304 if (not firstRound) {
305 // Also call the event function for the master, but not the first time
306 callEvent(module);
307 }
308 // initialize random number state for the event handling after we have
309 // recieved the event information from the master module.
311 }
312
313 if (g_signalReceived != 0) {
314 if (g_signalReceived != SIGINT) {
315 throw StoppedBySignalException(g_signalReceived);
316 } else {
317 B2DEBUG(10, "Received a SIGINT in the worker process...");
318 return true;
319 }
320 }
321
322 B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
323
324 if (m_eventMetaDataPtr->isEndOfData()) {
325 // Immediately leave the loop and terminate (true)
326 return true;
327 }
328
329 if (module == m_master and not firstRound) {
330 if (m_eventMetaDataPtr->isEndOfRun()) {
331 B2DEBUG(10, "Calling endRun()");
332 // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
333 m_processStatisticsPtr->suspendGlobal();
334 m_inRun = true;
336 m_processStatisticsPtr->resumeGlobal();
337
338 // Store the current event meta data for the next round
340
341 // Leave this event, but not the full processing (false)
342 return false;
343 } else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
344 // The run has changes (or we never had one), so call beginRun() before going on
345 // The run number should not be 0
346 if (m_eventMetaDataPtr->getRun() != 0) {
347 m_processStatisticsPtr->suspendGlobal();
349 m_processStatisticsPtr->resumeGlobal();
350 } else {
351 return false;
352 }
353 }
354
355 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
356 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
357 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
358 and not m_previousEventMetaData.isEndOfRun();
359 if (runChangedWithoutNotice) {
360 m_processStatisticsPtr->suspendGlobal();
361
362 m_inRun = true;
365
366 m_processStatisticsPtr->resumeGlobal();
367 }
368
369 // make sure we use the event dependent generator again
371
372 // and the correct database
374
375 // Store the current event meta data for the next round
377 }
378
379 // Check for the module conditions, evaluate them and if one is true, switch to the new path
380 if (module->evalCondition()) {
381 PathPtr condPath = module->getConditionPath();
382 // continue with parent Path after condition path is executed?
383 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
384 moduleIter = PathIterator(condPath, moduleIter);
385 } else {
386 moduleIter = PathIterator(condPath);
387 }
388 } else {
389 moduleIter.next();
390 }
391 }
392 return false;
393}
394
395std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
396{
397 unsigned int needToRestart = 0;
398
399 // Check for processes, which where there last time but are gone now (so they died)
400 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
401 const auto& pid = *iter;
402
403 // check the status of this process pid
404 int status;
405 const int result = waitpid(pid, &status, WNOHANG);
406 if (result == -1) {
407 if (errno == EINTR) {
408 // interrupted, try again next time
409 ++iter;
410 continue;
411 } else {
412 B2FATAL("waitpid() failed.");
413 }
414 } else if (result == 0) {
415 // No change, so lets continue with the next worker
416 ++iter;
417 continue;
418 }
419
420 B2ASSERT("Do not understand the result of waitpid()", result == pid);
421
422 // state has changed, which means it is dead!
423 const auto exitCode = WEXITSTATUS(status);
424
425 // we only need to restart unexpected deads
426 if (exitCode != 0) {
427 B2WARNING("A worker process has died unexpected!");
428 needToRestart += 1;
429
430 sendTerminatedMessage(pid, true);
431 }
432
433 // once a process is gone from the global list, remove them from our own, too.
434 iter = m_processList.erase(iter);
435 }
436
437 return {m_processList.size(), needToRestart};
438}
439
441{
442 for (auto& socket : m_sockets) {
443 socket.release();
444 }
445 m_parent.reset();
446}
447
449{
450 fflush(stdout);
451 fflush(stderr);
452
453 pid_t pid = fork();
454
455 if (pid > 0) {
456 g_processNumber++;
457 m_processList.push_back(pid);
458 return false;
459 } else if (pid < 0) {
460 B2FATAL("fork() failed: " << strerror(errno));
461 } else {
462 // Child process
463 // Reset some python state: signals, threads, gil in the child
464 PyOS_AfterFork_Child();
465 // InputController becomes useless in child process
467 // die when parent dies
468 prctl(PR_SET_PDEATHSIG, SIGHUP);
469
471 return true;
472 }
473}
474
475void processNumbered(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false,
476 bool appendProcessNumberToModuleName = true)
477{
478 static bool already_executed = false;
479 B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
480
481 auto& environment = Environment::Instance();
482 B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
483
484 namespace py = boost::python;
485 std::vector<std::string> outputAddressesAsString;
486 size_t nList = py::len(outputAddresses);
487 for (size_t iList = 0; iList < nList; ++iList) {
488 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
489 }
490
491 try {
495
496 already_executed = true;
497
498 HLTEventProcessor processor(outputAddressesAsString);
499 processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
500
502 } catch (std::exception& e) {
503 B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
504 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
505 throw; //and let python's global handler do the rest
506 } catch (...) {
507 B2ERROR("Uncaught exception encountered!"); //should show module name
508 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
509 throw; //and let python's global handler do the rest
510 }
511}
512
513void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
514{
515 processNumbered(startPath, outputAddresses, restartFailedWorkers, false);
516}
517
518BOOST_PYTHON_MODULE(hbasf2)
519{
520 def("processNumbered", &processNumbered);
521 def("process", &process);
522}
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition DataStore.cc:53
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition DataStore.cc:93
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition DataStore.cc:85
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition DataStore.cc:714
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
void processEndRun()
Calls the end run methods of all modules.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run?
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::vector< int > m_processList
The current list of running processes (with their PIDs)
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
static void resetForChildProcess()
Reset InputController (e.g.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition LogConfig.h:30
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition LogSystem.cc:147
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition LogSystem.cc:158
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition LogSystem.cc:28
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition Module.h:72
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition Module.h:80
Iterator over a Path (returning Module pointers).
void next()
increment.
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static std::unique_ptr< AMessage > fromSocket(const std::unique_ptr< zmq::socket_t > &socket)
Create a message of the given type by receiving a message from the socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition ZMQParent.cc:32
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition DBStore.cc:175
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition ZMQParent.h:153
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition Path.h:35
static DBStore & Instance()
Instance of a singleton DBStore.
Definition DBStore.cc:26
void updateEvent()
Updates all intra-run dependent objects.
Definition DBStore.cc:140
Abstract base class for different kinds of events.