8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
26 #include <sys/prctl.h>
35 using namespace boost::python;
39 static int g_signalReceived = 0;
42 static int g_processNumber = 1;
44 static void storeSignal(
int signalNumber)
46 if (signalNumber == SIGINT) {
51 if (g_signalReceived == 0) {
52 g_signalReceived = signalNumber;
59 for (
auto& socket : m_sockets) {
64 if (not waitForConfirmation) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
71 B2FATAL(
"Did not receive a confirmation message!");
78 m_sockets.reserve(outputAddresses.size());
79 for (
const auto& address : outputAddresses) {
80 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address,
false));
86 using namespace std::chrono_literals;
88 m_moduleList = path->buildModulePathList();
91 B2ASSERT(
"You try to process an empty path!", not m_moduleList.empty());
92 for (
const auto& module : m_moduleList) {
95 if (hasParallelFlag and module->hasCondition()) {
96 for (
const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag =
false;
102 B2ASSERT(
"Module with name " << module->getName() <<
" does not have parallel flag!", hasParallelFlag);
106 installMainSignalHandlers();
107 processInitialize(m_moduleList);
111 B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
117 if (numLogError != 0) {
118 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
125 installMainSignalHandlers(storeSignal);
127 int numberOfRestartedWorkers = 0;
131 if (g_signalReceived > 0) {
132 B2WARNING(
"Received a signal to go down.");
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
140 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
143 runWorkers(path, neededWorkers);
144 numberOfRestartedWorkers += neededWorkers;
146 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
149 }
else if (presentWorkers == 0) {
150 B2DEBUG(10,
"All workers have cleanly exited. Will now also exit");
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers <<
", which I think is abnormal. "
156 "Will terminate the process now!");
160 std::this_thread::sleep_for(10ms);
163 if (appendProcessNumberToModuleName) {
164 for (
const int& pid : m_processList) {
165 B2INFO(g_processNumber <<
": Send SIGINT to " << pid);
168 for (
const int& pid : m_processList) {
172 if (kill(pid, 0) != 0) {
175 B2DEBUG(10, g_processNumber <<
": Checking process termination, count = " << count);
176 std::this_thread::sleep_for(1000ms);
177 if (count % 5 == 1) kill(pid, SIGINT);
183 checkChildProcesses();
186 std::this_thread::sleep_for(500ms);
188 for (
const int& pid : m_processList) {
189 if (kill(pid, SIGKILL) >= 0) {
190 B2WARNING(
"Needed to hard kill process " << pid);
192 B2DEBUG(100,
"no process " << pid <<
" found, already gone?");
194 sendTerminatedMessage(pid,
false);
196 m_processList.clear();
198 B2DEBUG(10,
"Done here");
202 if (g_signalReceived == SIGINT) {
203 installSignalHandler(SIGINT, SIG_DFL);
210 for (
unsigned int i = 0; i < numProcesses; i++) {
213 B2DEBUG(10,
"Starting a new worker process");
218 installMainSignalHandlers(storeSignal);
220 if (appendProcessNumberToModuleName) {
221 for (
const auto& module : m_moduleList) {
222 module->setName(std::to_string(g_processNumber) + std::string(
"_") + module->getName());
223 B2INFO(
"New worker name is " << module->getName());
229 gROOT->GetListOfFiles()->Delete();
234 if (m_eventMetaDataPtr)
235 B2ERROR(
"Exception occured in exp/run/evt: "
236 << m_eventMetaDataPtr->getExperiment() <<
" / "
237 << m_eventMetaDataPtr->getRun() <<
" / "
238 << m_eventMetaDataPtr->getEvent());
242 B2DEBUG(10,
"Ending a worker process here.");
251 bool terminationRequested =
false;
252 bool firstRound =
true;
258 m_previousEventMetaData.setEndOfData();
260 while (not terminationRequested) {
261 B2DEBUG(100,
"Processing new event");
264 m_processStatisticsPtr->startGlobal();
268 terminationRequested = processEvent(moduleIter, firstRound);
281 B2DEBUG(10,
"Calling terminate");
282 m_eventMetaDataPtr.create();
283 processTerminate(m_moduleList);
288 while (not moduleIter.
isDone()) {
290 B2DEBUG(10,
"Starting event of " << module->getName());
293 if (module != m_master) {
299 if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
300 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
301 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and "
302 << module->getName());
305 if (not firstRound) {
314 if (g_signalReceived != 0) {
315 if (g_signalReceived != SIGINT) {
318 B2DEBUG(10,
"Received a SIGINT in the worker process...");
323 B2ASSERT(
"The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
325 if (m_eventMetaDataPtr->isEndOfData()) {
330 if (module == m_master and not firstRound) {
331 if (m_eventMetaDataPtr->isEndOfRun()) {
332 B2DEBUG(10,
"Calling endRun()");
334 m_processStatisticsPtr->suspendGlobal();
337 m_processStatisticsPtr->resumeGlobal();
340 m_previousEventMetaData = *m_eventMetaDataPtr;
344 }
else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
346 m_processStatisticsPtr->suspendGlobal();
348 m_processStatisticsPtr->resumeGlobal();
351 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
352 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
353 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
354 and not m_previousEventMetaData.isEndOfRun();
355 if (runChangedWithoutNotice) {
356 m_processStatisticsPtr->suspendGlobal();
362 m_processStatisticsPtr->resumeGlobal();
372 m_previousEventMetaData = *m_eventMetaDataPtr;
376 if (module->evalCondition()) {
377 PathPtr condPath = module->getConditionPath();
393 unsigned int needToRestart = 0;
396 for (
auto iter = m_processList.begin(); iter != m_processList.end();) {
397 const auto& pid = *iter;
401 const int result = waitpid(pid, &status, WNOHANG);
403 if (errno == EINTR) {
408 B2FATAL(
"waitpid() failed.");
410 }
else if (result == 0) {
416 B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
419 const auto exitCode = WEXITSTATUS(status);
423 B2WARNING(
"A worker process has died unexpected!");
426 sendTerminatedMessage(pid,
true);
430 iter = m_processList.erase(iter);
433 return {m_processList.size(), needToRestart};
438 for (
auto& socket : m_sockets) {
453 m_processList.push_back(pid);
455 }
else if (pid < 0) {
456 B2FATAL(
"fork() failed: " << strerror(errno));
460 PyOS_AfterFork_Child();
464 prctl(PR_SET_PDEATHSIG, SIGHUP);
471 void processNumbered(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false,
472 bool appendProcessNumberToModuleName =
true)
474 static bool already_executed =
false;
475 B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
478 B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
480 namespace py = boost::python;
481 std::vector<std::string> outputAddressesAsString;
482 size_t nList = py::len(outputAddresses);
483 for (
size_t iList = 0; iList < nList; ++iList) {
484 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
492 already_executed =
true;
495 processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
498 }
catch (std::exception& e) {
499 B2ERROR(
"Uncaught exception encountered: " << e.what());
503 B2ERROR(
"Uncaught exception encountered!");
509 void process(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false)
511 processNumbered(startPath, outputAddresses, restartFailedWorkers,
false);
514 BOOST_PYTHON_MODULE(hbasf2)
516 def(
"processNumbered", &processNumbered);
517 def(
"process", &process);
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
@ c_Error
Error: for things that went wrong and have to be fixed.
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
void reset(bool keepEntries=false)
Invalidate all payloads.
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Abstract base class for different kinds of events.