Belle II Software development
HLTEventProcessor.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <daq/hbasf2/utils/HLTEventProcessor.h>
9
10#include <boost/python.hpp>
11#include <framework/utilities/RegisterPythonModule.h>
12#include <framework/core/InputController.h>
13#include <framework/pcore/ProcHandler.h>
14
15#include <framework/database/DBStore.h>
16#include <framework/core/RandomNumbers.h>
17#include <framework/core/Environment.h>
18#include <framework/core/ModuleManager.h>
19
20#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
23
24#include <TROOT.h>
25
26#include <sys/prctl.h>
27#include <sys/wait.h>
28
29#include <chrono>
30#include <thread>
31#include <signal.h>
32#include <zmq.h>
33
34using namespace Belle2;
35using namespace boost::python;
36
37namespace {
39 static int g_signalReceived = 0;
40
41 // For processor unique ID
42 static int g_processNumber = 1;
43
44 static void storeSignal(int signalNumber)
45 {
46 if (signalNumber == SIGINT) {
47 EventProcessor::writeToStdErr("\nStopping basf2 gracefully...\n");
48 }
49
50 // We do not want to remove the first signal
51 if (g_signalReceived == 0) {
52 g_signalReceived = signalNumber;
53 }
54 }
55}
56
57void HLTEventProcessor::sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
58{
59 for (auto& socket : m_sockets) {
60 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_deleteWorkerMessage,
62 ZMQParent::send(socket, std::move(message));
63
64 if (not waitForConfirmation) {
65 continue;
66 }
67 if (ZMQParent::poll({socket.get()}, 10 * 1000)) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT("Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70 } else {
71 B2FATAL("Did not receive a confirmation message!");
72 }
73 }
74}
75
76HLTEventProcessor::HLTEventProcessor(const std::vector<std::string>& outputAddresses)
77{
78 m_sockets.reserve(outputAddresses.size());
79 for (const auto& address : outputAddresses) {
80 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, false));
81 }
82}
83
84void HLTEventProcessor::process(PathPtr path, bool restartFailedWorkers, bool appendProcessNumberToModuleName)
85{
86 using namespace std::chrono_literals;
87
88 m_moduleList = path->buildModulePathList();
89
90 // Assert path is what we want: fully parallel certified, not empty. Set first module to master module
91 B2ASSERT("You try to process an empty path!", not m_moduleList.empty());
92 for (const auto& module : m_moduleList) {
93 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
94 // entire conditional path must also be compatible
95 if (hasParallelFlag and module->hasCondition()) {
96 for (const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag = false;
99 }
100 }
101 }
102 B2ASSERT("Module with name " << module->getName() << " does not have parallel flag!", hasParallelFlag);
103 }
104
105 // Initialize of all modules (including event() of master module)
108
109 // Don't start processing in case of no master module
110 if (not m_master) {
111 B2ERROR("There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
113 }
114
115 // Check if errors appeared. If yes, don't start the event processing.
117 if (numLogError != 0) {
118 B2FATAL(numLogError << " ERROR(S) occurred! The processing of events will not be started.");
119 }
120
121 // Start the workers, which call the main loop
122 const int numProcesses = Environment::Instance().getNumberProcesses();
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
124
125 installMainSignalHandlers(storeSignal);
126 // Back in the main process: wait for the processes and monitor them
127 int numberOfRestartedWorkers = 0;
128 while (true) {
129 // check if we have received any signal from the user or OS.
130 // Killing of the remaining processes happens after the loop.
131 if (g_signalReceived > 0) {
132 B2WARNING("Received a signal to go down.");
133 break;
134 }
135
136 // Test if we need more workers and if one has died
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
139
140 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
143 runWorkers(path, neededWorkers);
144 numberOfRestartedWorkers += neededWorkers;
145 } else {
146 B2ERROR("A worker failed. Will try to end the process smoothly now.");
147 break;
148 }
149 } else if (presentWorkers == 0) {
150 B2DEBUG(10, "All workers have cleanly exited. Will now also exit");
151 break;
152 }
153
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR("I needed to restart on total " << numberOfRestartedWorkers << ", which I think is abnormal. "
156 "Will terminate the process now!");
157 break;
158 }
159
160 std::this_thread::sleep_for(10ms);
161 }
162
163 if (appendProcessNumberToModuleName) {
164 for (const int& pid : m_processList) {
165 B2INFO(g_processNumber << ": Send SIGINT to " << pid);
166 kill(pid, SIGINT);
167 }
168 for (const int& pid : m_processList) {
169 int count = 0;
170 while (true) {
171 // Do not allow internal SIGKILL to prevent data loss in case of a numbered process
172 if (kill(pid, 0) != 0) {
173 break;
174 }
175 B2DEBUG(10, g_processNumber << ": Checking process termination, count = " << count);
176 std::this_thread::sleep_for(1000ms);
177 if (count % 5 == 1) kill(pid, SIGINT);
178 ++count;
179 }
180 }
181 }
182
184
185 // if we still have/had processes, we should unregister them
186 std::this_thread::sleep_for(500ms);
187
188 for (const int& pid : m_processList) {
189 if (kill(pid, SIGKILL) >= 0) {
190 B2WARNING("Needed to hard kill process " << pid);
191 } else {
192 B2DEBUG(100, "no process " << pid << " found, already gone?");
193 }
194 sendTerminatedMessage(pid, false);
195 }
196 m_processList.clear();
197
198 B2DEBUG(10, "Done here");
199
200 // Normally, we would call terminate here, but not on HLT!
201 // Normally, we would print the error summary here, but not on HLT!
202 if (g_signalReceived == SIGINT) {
203 installSignalHandler(SIGINT, SIG_DFL);
204 raise(SIGINT);
205 }
206}
207
208void HLTEventProcessor::runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName)
209{
210 for (unsigned int i = 0; i < numProcesses; i++) {
211 if (forkOut()) {
212 // Do only run in forked out worker process:
213 B2DEBUG(10, "Starting a new worker process");
214 // Reset the parent and sockets
215 release();
216
217 // Start the main loop with our signal handling and error catching
218 installMainSignalHandlers(storeSignal);
219 try {
220 if (appendProcessNumberToModuleName) {
221 for (const auto& module : m_moduleList) {
222 module->setName(std::to_string(g_processNumber) + std::string("_") + module->getName());
223 B2INFO("New worker name is " << module->getName());
224 }
225 }
226 processCore(path);
227 } catch (StoppedBySignalException& e) {
228 // close all open ROOT files, ROOT's exit handler will crash otherwise
229 gROOT->GetListOfFiles()->Delete();
230
231 B2ERROR(e.what());
232 exit(1);
233 } catch (...) {
235 B2ERROR("Exception occured in exp/run/evt: "
236 << m_eventMetaDataPtr->getExperiment() << " / "
237 << m_eventMetaDataPtr->getRun() << " / "
238 << m_eventMetaDataPtr->getEvent());
239 throw;
240 }
241
242 B2DEBUG(10, "Ending a worker process here.");
243 // Ok, we are done here!
244 exit(0);
245 }
246 }
247}
248
250{
251 bool terminationRequested = false;
252 bool firstRound = true;
253
254 // Initialisation is done
256
257 // Set the previous event meta data to something invalid
259
260 while (not terminationRequested) {
261 B2DEBUG(100, "Processing new event");
262
263 // Start the measurement
264 m_processStatisticsPtr->startGlobal();
265
266 // Main call to event() of the modules, and maybe beginRun() and endRun()
267 PathIterator moduleIter(path);
268 terminationRequested = processEvent(moduleIter, firstRound);
269
270 // Delete event related data in DataStore
272
273 // Stop the measurement
275
276 // We are surely not in the first round the next time
277 firstRound = false;
278 }
279
280 // End last run with a terminate. Yes, we are not calling a endRun() here and yes, we are calling this in the worker
281 B2DEBUG(10, "Calling terminate");
282 m_eventMetaDataPtr.create();
284}
285
286bool HLTEventProcessor::processEvent(PathIterator moduleIter, bool firstRound)
287{
288 while (not moduleIter.isDone()) {
289 Module* module = moduleIter.get();
290 B2DEBUG(10, "Starting event of " << module->getName());
291
292 // The actual call of the event function
293 if (module != m_master) {
294 // If this is not the master module it is quite simple: just call the event function
295 callEvent(module);
296
297 // Check for a second master module. Cannot do this if we are in the first round after initialize
298 // (as previous event meta data is not set properly here)
301 B2FATAL("Two modules setting EventMetaData were discovered: " << m_master->getName() << " and "
302 << module->getName());
303 }
304 } else {
305 if (not firstRound) {
306 // Also call the event function for the master, but not the first time
307 callEvent(module);
308 }
309 // initialize random number state for the event handling after we have
310 // recieved the event information from the master module.
312 }
313
314 if (g_signalReceived != 0) {
315 if (g_signalReceived != SIGINT) {
316 throw StoppedBySignalException(g_signalReceived);
317 } else {
318 B2DEBUG(10, "Received a SIGINT in the worker process...");
319 return true;
320 }
321 }
322
323 B2ASSERT("The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
324
325 if (m_eventMetaDataPtr->isEndOfData()) {
326 // Immediately leave the loop and terminate (true)
327 return true;
328 }
329
330 if (module == m_master and not firstRound) {
331 if (m_eventMetaDataPtr->isEndOfRun()) {
332 B2DEBUG(10, "Calling endRun()");
333 // call endRun() of all modules (internally uses the previous event meta data) and skip to the next event
334 m_processStatisticsPtr->suspendGlobal();
335 m_inRun = true;
337 m_processStatisticsPtr->resumeGlobal();
338
339 // Store the current event meta data for the next round
341
342 // Leave this event, but not the full processing (false)
343 return false;
345 // The run has changes (or we never had one), so call beginRun() before going on
346 m_processStatisticsPtr->suspendGlobal();
348 m_processStatisticsPtr->resumeGlobal();
349 }
350
351 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
353 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
355 if (runChangedWithoutNotice) {
356 m_processStatisticsPtr->suspendGlobal();
357
358 m_inRun = true;
361
362 m_processStatisticsPtr->resumeGlobal();
363 }
364
365 // make sure we use the event dependent generator again
367
368 // and the correct database
370
371 // Store the current event meta data for the next round
373 }
374
375 // Check for the module conditions, evaluate them and if one is true, switch to the new path
376 if (module->evalCondition()) {
377 PathPtr condPath = module->getConditionPath();
378 // continue with parent Path after condition path is executed?
379 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
380 moduleIter = PathIterator(condPath, moduleIter);
381 } else {
382 moduleIter = PathIterator(condPath);
383 }
384 } else {
385 moduleIter.next();
386 }
387 }
388 return false;
389}
390
391std::pair<unsigned int, unsigned int> HLTEventProcessor::checkChildProcesses()
392{
393 unsigned int needToRestart = 0;
394
395 // Check for processes, which where there last time but are gone now (so they died)
396 for (auto iter = m_processList.begin(); iter != m_processList.end();) {
397 const auto& pid = *iter;
398
399 // check the status of this process pid
400 int status;
401 const int result = waitpid(pid, &status, WNOHANG);
402 if (result == -1) {
403 if (errno == EINTR) {
404 // interrupted, try again next time
405 ++iter;
406 continue;
407 } else {
408 B2FATAL("waitpid() failed.");
409 }
410 } else if (result == 0) {
411 // No change, so lets continue with the next worker
412 ++iter;
413 continue;
414 }
415
416 B2ASSERT("Do not understand the result of waitpid()", result == pid);
417
418 // state has changed, which means it is dead!
419 const auto exitCode = WEXITSTATUS(status);
420
421 // we only need to restart unexpected deads
422 if (exitCode != 0) {
423 B2WARNING("A worker process has died unexpected!");
424 needToRestart += 1;
425
426 sendTerminatedMessage(pid, true);
427 }
428
429 // once a process is gone from the global list, remove them from our own, too.
430 iter = m_processList.erase(iter);
431 }
432
433 return {m_processList.size(), needToRestart};
434}
435
437{
438 for (auto& socket : m_sockets) {
439 socket.release();
440 }
441 m_parent.reset();
442}
443
445{
446 fflush(stdout);
447 fflush(stderr);
448
449 pid_t pid = fork();
450
451 if (pid > 0) {
452 g_processNumber++;
453 m_processList.push_back(pid);
454 return false;
455 } else if (pid < 0) {
456 B2FATAL("fork() failed: " << strerror(errno));
457 } else {
458 // Child process
459 // Reset some python state: signals, threads, gil in the child
460 PyOS_AfterFork_Child();
461 // InputController becomes useless in child process
463 // die when parent dies
464 prctl(PR_SET_PDEATHSIG, SIGHUP);
465
467 return true;
468 }
469}
470
471void processNumbered(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false,
472 bool appendProcessNumberToModuleName = true)
473{
474 static bool already_executed = false;
475 B2ASSERT("Can not run process() on HLT twice per file!", not already_executed);
476
477 auto& environment = Environment::Instance();
478 B2ASSERT("HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
479
480 namespace py = boost::python;
481 std::vector<std::string> outputAddressesAsString;
482 size_t nList = py::len(outputAddresses);
483 for (size_t iList = 0; iList < nList; ++iList) {
484 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr("__str__")()));
485 }
486
487 try {
491
492 already_executed = true;
493
494 HLTEventProcessor processor(outputAddressesAsString);
495 processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
496
498 } catch (std::exception& e) {
499 B2ERROR("Uncaught exception encountered: " << e.what()); //should show module name
500 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
501 throw; //and let python's global handler do the rest
502 } catch (...) {
503 B2ERROR("Uncaught exception encountered!"); //should show module name
504 DataStore::Instance().reset(); // ensure we are executed before ROOT's exit handlers
505 throw; //and let python's global handler do the rest
506 }
507}
508
509void process(PathPtr startPath, const boost::python::list& outputAddresses, bool restartFailedWorkers = false)
510{
511 processNumbered(startPath, outputAddresses, restartFailedWorkers, false);
512}
513
514BOOST_PYTHON_MODULE(hbasf2)
515{
516 def("processNumbered", &processNumbered);
517 def("process", &process);
518}
519
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
Definition: DataStore.h:59
static DataStore & Instance()
Instance of singleton Store.
Definition: DataStore.cc:54
void setInitializeActive(bool active)
Setter for m_initializeActive.
Definition: DataStore.cc:94
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
Definition: DataStore.cc:86
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
Definition: DataStore.cc:715
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:157
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
void setEndOfData()
Marks the end of the data processing.
int getRun() const
Run Getter.
bool isEndOfRun() const
is end-of-run set? (see setEndOfRun()).
int getExperiment() const
Experiment Getter.
bool isEndOfData() const
is end-of-data set? (see setEndOfData()).
Exception thrown when execution is stopped by a signal.
void processEndRun()
Calls the end run methods of all modules.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::vector< int > m_processList
The current list of running processes (with their PIDs)
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
static void resetForChildProcess()
Reset InputController (e.g.
@ c_Error
Error: for things that went wrong and have to be fixed.
Definition: LogConfig.h:30
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
Definition: LogSystem.cc:150
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
Definition: LogSystem.cc:161
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
Definition: LogSystem.cc:31
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
Base class for Modules.
Definition: Module.h:72
const std::string & getName() const
Returns the name of the module.
Definition: Module.h:187
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
void next()
increment.
Definition: PathIterator.h:49
bool isDone() const
Are we finished iterating?
Definition: PathIterator.h:72
Module * get() const
dereference.
Definition: PathIterator.h:75
static void setProcessID(int processID)
Set the process ID of this process.
Definition: ProcHandler.cc:250
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
Definition: ZMQParent.cc:56
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
Definition: ZMQParent.cc:32
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
Definition: ZMQParent.cc:27
void reset(bool keepEntries=false)
Invalidate all payloads.
Definition: DBStore.cc:177
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
Definition: ZMQParent.h:153
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
static DBStore & Instance()
Instance of a singleton DBStore.
Definition: DBStore.cc:28
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
Definition: ZMQParent.h:105
void updateEvent()
Updates all intra-run dependent objects.
Definition: DBStore.cc:142
Abstract base class for different kinds of events.