10 #include <daq/hbasf2/utils/HLTEventProcessor.h>
12 #include <boost/python.hpp>
13 #include <framework/utilities/RegisterPythonModule.h>
14 #include <framework/core/InputController.h>
15 #include <framework/pcore/ProcHandler.h>
17 #include <framework/database/DBStore.h>
18 #include <framework/core/RandomNumbers.h>
19 #include <framework/core/Environment.h>
20 #include <framework/core/ModuleManager.h>
22 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
23 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
24 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
28 #include <sys/prctl.h>
37 using namespace boost::python;
41 static int g_signalReceived = 0;
43 static void storeSignal(
int signalNumber)
45 if (signalNumber == SIGINT) {
50 if (g_signalReceived == 0) {
51 g_signalReceived = signalNumber;
58 for (
auto& socket : m_sockets) {
63 if (not waitForConformation) {
67 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
68 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
70 B2FATAL(
"Did not receive a confirmation message!");
77 m_sockets.reserve(outputAddresses.size());
78 for (
const auto& address : outputAddresses) {
79 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address,
false));
85 using namespace std::chrono_literals;
87 m_moduleList = path->buildModulePathList();
90 B2ASSERT(
"You try to process an empty path!", not m_moduleList.empty());
91 for (
const auto& module : m_moduleList) {
94 if (hasParallelFlag and module->hasCondition()) {
95 for (
const auto& conditionPath : module->getAllConditionPaths()) {
97 hasParallelFlag =
false;
101 B2ASSERT(
"Module with name " << module->getName() <<
" does not have parallel flag!", hasParallelFlag);
105 installMainSignalHandlers();
106 processInitialize(m_moduleList);
110 B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). "
111 "You must add the specific HLT module as first module to the path.");
116 if (numLogError != 0) {
117 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
122 runWorkers(path, numProcesses);
124 installMainSignalHandlers(storeSignal);
126 int numberOfRestartedWorkers = 0;
130 if (g_signalReceived > 0) {
131 B2WARNING(
"Received a signal to go down.");
136 unsigned int presentWorkers;
137 unsigned int neededWorkers;
139 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
140 if (neededWorkers > 0) {
141 if (restartFailedWorkers) {
142 runWorkers(path, neededWorkers);
143 numberOfRestartedWorkers += neededWorkers;
145 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
148 }
else if (presentWorkers == 0) {
149 B2DEBUG(10,
"All workers have cleanly exited. Will now also exit");
153 if (numberOfRestartedWorkers > numProcesses) {
154 B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers <<
", which I think is abnormal. "
155 "Will terminate the process now!");
159 std::this_thread::sleep_for(10ms);
162 checkChildProcesses();
165 std::this_thread::sleep_for(500ms);
167 for (
const int& pid : m_processList) {
168 if (kill(pid, SIGKILL) >= 0) {
169 B2WARNING(
"Needed to hard kill process " << pid);
171 B2DEBUG(100,
"no process " << pid <<
" found, already gone?");
173 sendTerminatedMessage(pid,
false);
175 m_processList.clear();
177 B2DEBUG(10,
"Done here");
181 if (g_signalReceived == SIGINT) {
182 installSignalHandler(SIGINT, SIG_DFL);
189 for (
unsigned int i = 0; i < numProcesses; i++) {
192 B2DEBUG(10,
"Starting a new worker process");
197 installMainSignalHandlers(storeSignal);
202 gROOT->GetListOfFiles()->Delete();
207 if (m_eventMetaDataPtr)
208 B2ERROR(
"Exception occured in exp/run/evt: "
209 << m_eventMetaDataPtr->getExperiment() <<
" / "
210 << m_eventMetaDataPtr->getRun() <<
" / "
211 << m_eventMetaDataPtr->getEvent());
215 B2DEBUG(10,
"Ending a worker process here.");
224 bool terminationRequested =
false;
225 bool firstRound =
true;
231 m_previousEventMetaData.setEndOfData();
233 while (not terminationRequested) {
234 B2DEBUG(100,
"Processing new event");
237 m_processStatisticsPtr->startGlobal();
241 terminationRequested = processEvent(moduleIter, firstRound);
254 B2DEBUG(10,
"Calling terminate");
255 m_eventMetaDataPtr.create();
256 processTerminate(m_moduleList);
261 while (not moduleIter.
isDone()) {
263 B2DEBUG(10,
"Starting event of " << module->getName());
266 if (module != m_master) {
272 if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
273 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
274 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and "
275 << module->getName());
278 if (not firstRound) {
287 if (g_signalReceived != 0) {
288 if (g_signalReceived != SIGINT) {
291 B2DEBUG(10,
"Received a SIGINT in the worker process...");
296 B2ASSERT(
"The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
298 if (m_eventMetaDataPtr->isEndOfData()) {
303 if (module == m_master and not firstRound) {
304 if (m_eventMetaDataPtr->isEndOfRun()) {
305 B2DEBUG(10,
"Calling endRun()");
307 m_processStatisticsPtr->suspendGlobal();
310 m_processStatisticsPtr->resumeGlobal();
313 m_previousEventMetaData = *m_eventMetaDataPtr;
317 }
else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
319 m_processStatisticsPtr->suspendGlobal();
321 m_processStatisticsPtr->resumeGlobal();
324 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
325 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
326 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
327 and not m_previousEventMetaData.isEndOfRun();
328 if (runChangedWithoutNotice) {
329 m_processStatisticsPtr->suspendGlobal();
335 m_processStatisticsPtr->resumeGlobal();
345 m_previousEventMetaData = *m_eventMetaDataPtr;
349 if (module->evalCondition()) {
350 PathPtr condPath = module->getConditionPath();
366 unsigned int needToRestart = 0;
369 for (
auto iter = m_processList.begin(); iter != m_processList.end();) {
370 const auto& pid = *iter;
374 const int result = waitpid(pid, &status, WNOHANG);
376 if (errno == EINTR) {
381 B2FATAL(
"waitpid() failed.");
383 }
else if (result == 0) {
389 B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
392 const auto exitCode = WEXITSTATUS(status);
396 B2WARNING(
"A worker process has died unexpected!");
399 sendTerminatedMessage(pid,
true);
403 iter = m_processList.erase(iter);
406 return {m_processList.size(), needToRestart};
411 for (
auto& socket : m_sockets) {
425 m_processList.push_back(pid);
427 }
else if (pid < 0) {
428 B2FATAL(
"fork() failed: " << strerror(errno));
436 prctl(PR_SET_PDEATHSIG, SIGHUP);
443 void process(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false)
445 static bool already_executed =
false;
446 B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
449 B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
451 namespace py = boost::python;
452 std::vector<std::string> outputAddressesAsString;
453 size_t nList = py::len(outputAddresses);
454 for (
size_t iList = 0; iList < nList; ++iList) {
455 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
463 already_executed =
true;
466 processor.process(startPath, restartFailedWorkers);
469 }
catch (std::exception& e) {
470 B2ERROR(
"Uncaught exception encountered: " <<
e.what());
474 B2ERROR(
"Uncaught exception encountered!");
480 BOOST_PYTHON_MODULE(hbasf2)
482 def(
"process", &process);