8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
26 #include <sys/prctl.h>
35 using namespace boost::python;
39 static int g_signalReceived = 0;
42 static int g_processNumber = 1;
44 static void storeSignal(
int signalNumber)
46 if (signalNumber == SIGINT) {
51 if (g_signalReceived == 0) {
52 g_signalReceived = signalNumber;
59 for (
auto& socket : m_sockets) {
64 if (not waitForConfirmation) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
71 B2FATAL(
"Did not receive a confirmation message!");
78 m_sockets.reserve(outputAddresses.size());
79 for (
const auto& address : outputAddresses) {
80 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address,
false));
86 using namespace std::chrono_literals;
88 m_moduleList = path->buildModulePathList();
91 B2ASSERT(
"You try to process an empty path!", not m_moduleList.empty());
92 for (
const auto& module : m_moduleList) {
95 if (hasParallelFlag and module->hasCondition()) {
96 for (
const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag =
false;
102 B2ASSERT(
"Module with name " << module->getName() <<
" does not have parallel flag!", hasParallelFlag);
106 installMainSignalHandlers();
107 processInitialize(m_moduleList);
111 B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
117 if (numLogError != 0) {
118 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
125 installMainSignalHandlers(storeSignal);
127 int numberOfRestartedWorkers = 0;
131 if (g_signalReceived > 0) {
132 B2WARNING(
"Received a signal to go down.");
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
140 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
143 runWorkers(path, neededWorkers);
144 numberOfRestartedWorkers += neededWorkers;
146 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
149 }
else if (presentWorkers == 0) {
150 B2DEBUG(10,
"All workers have cleanly exited. Will now also exit");
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers <<
", which I think is abnormal. "
156 "Will terminate the process now!");
160 std::this_thread::sleep_for(10ms);
163 if (appendProcessNumberToModuleName) {
164 for (
const int& pid : m_processList) {
165 B2INFO(g_processNumber <<
": Send SIGINT to " << pid);
168 for (
const int& pid : m_processList) {
171 if (kill(pid, 0) != 0) {
174 B2DEBUG(10, g_processNumber <<
": Checking process termination, count = " << count);
175 std::this_thread::sleep_for(1000ms);
178 if (count % 5 == 1) kill(pid, SIGINT);
179 if (count == 1200)
break;
185 checkChildProcesses();
188 std::this_thread::sleep_for(500ms);
190 for (
const int& pid : m_processList) {
191 if (kill(pid, SIGKILL) >= 0) {
192 B2WARNING(
"Needed to hard kill process " << pid);
194 B2DEBUG(100,
"no process " << pid <<
" found, already gone?");
196 sendTerminatedMessage(pid,
false);
198 m_processList.clear();
200 B2DEBUG(10,
"Done here");
204 if (g_signalReceived == SIGINT) {
205 installSignalHandler(SIGINT, SIG_DFL);
212 for (
unsigned int i = 0; i < numProcesses; i++) {
215 B2DEBUG(10,
"Starting a new worker process");
220 installMainSignalHandlers(storeSignal);
222 if (appendProcessNumberToModuleName) {
223 for (
const auto& module : m_moduleList) {
224 module->setName(std::to_string(g_processNumber) + std::string(
"_") + module->getName());
225 B2INFO(
"New worker name is " << module->getName());
231 gROOT->GetListOfFiles()->Delete();
236 if (m_eventMetaDataPtr)
237 B2ERROR(
"Exception occured in exp/run/evt: "
238 << m_eventMetaDataPtr->getExperiment() <<
" / "
239 << m_eventMetaDataPtr->getRun() <<
" / "
240 << m_eventMetaDataPtr->getEvent());
244 B2DEBUG(10,
"Ending a worker process here.");
253 bool terminationRequested =
false;
254 bool firstRound =
true;
260 m_previousEventMetaData.setEndOfData();
262 while (not terminationRequested) {
263 B2DEBUG(100,
"Processing new event");
266 m_processStatisticsPtr->startGlobal();
270 terminationRequested = processEvent(moduleIter, firstRound);
283 B2DEBUG(10,
"Calling terminate");
284 m_eventMetaDataPtr.create();
285 processTerminate(m_moduleList);
290 while (not moduleIter.
isDone()) {
292 B2DEBUG(10,
"Starting event of " << module->getName());
295 if (module != m_master) {
301 if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
302 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
303 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and "
304 << module->getName());
307 if (not firstRound) {
316 if (g_signalReceived != 0) {
317 if (g_signalReceived != SIGINT) {
320 B2DEBUG(10,
"Received a SIGINT in the worker process...");
325 B2ASSERT(
"The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
327 if (m_eventMetaDataPtr->isEndOfData()) {
332 if (module == m_master and not firstRound) {
333 if (m_eventMetaDataPtr->isEndOfRun()) {
334 B2DEBUG(10,
"Calling endRun()");
336 m_processStatisticsPtr->suspendGlobal();
339 m_processStatisticsPtr->resumeGlobal();
342 m_previousEventMetaData = *m_eventMetaDataPtr;
346 }
else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
348 m_processStatisticsPtr->suspendGlobal();
350 m_processStatisticsPtr->resumeGlobal();
353 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
354 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
355 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
356 and not m_previousEventMetaData.isEndOfRun();
357 if (runChangedWithoutNotice) {
358 m_processStatisticsPtr->suspendGlobal();
364 m_processStatisticsPtr->resumeGlobal();
374 m_previousEventMetaData = *m_eventMetaDataPtr;
378 if (module->evalCondition()) {
379 PathPtr condPath = module->getConditionPath();
395 unsigned int needToRestart = 0;
398 for (
auto iter = m_processList.begin(); iter != m_processList.end();) {
399 const auto& pid = *iter;
403 const int result = waitpid(pid, &status, WNOHANG);
405 if (errno == EINTR) {
410 B2FATAL(
"waitpid() failed.");
412 }
else if (result == 0) {
418 B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
421 const auto exitCode = WEXITSTATUS(status);
425 B2WARNING(
"A worker process has died unexpected!");
428 sendTerminatedMessage(pid,
true);
432 iter = m_processList.erase(iter);
435 return {m_processList.size(), needToRestart};
440 for (
auto& socket : m_sockets) {
455 m_processList.push_back(pid);
457 }
else if (pid < 0) {
458 B2FATAL(
"fork() failed: " << strerror(errno));
462 PyOS_AfterFork_Child();
466 prctl(PR_SET_PDEATHSIG, SIGHUP);
473 void processNumbered(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false,
474 bool appendProcessNumberToModuleName =
true)
476 static bool already_executed =
false;
477 B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
480 B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
482 namespace py = boost::python;
483 std::vector<std::string> outputAddressesAsString;
484 size_t nList = py::len(outputAddresses);
485 for (
size_t iList = 0; iList < nList; ++iList) {
486 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
494 already_executed =
true;
497 processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
500 }
catch (std::exception& e) {
501 B2ERROR(
"Uncaught exception encountered: " << e.what());
505 B2ERROR(
"Uncaught exception encountered!");
511 void process(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false)
513 processNumbered(startPath, outputAddresses, restartFailedWorkers,
false);
516 BOOST_PYTHON_MODULE(hbasf2)
518 def(
"processNumbered", &processNumbered);
519 def(
"process", &process);
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
@ c_Error
Error: for things that went wrong and have to be fixed.
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
void reset(bool keepEntries=false)
Invalidate all payloads.
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Abstract base class for different kinds of events.