8 #include <daq/hbasf2/utils/HLTEventProcessor.h>
10 #include <boost/python.hpp>
11 #include <framework/utilities/RegisterPythonModule.h>
12 #include <framework/core/InputController.h>
13 #include <framework/pcore/ProcHandler.h>
15 #include <framework/database/DBStore.h>
16 #include <framework/core/RandomNumbers.h>
17 #include <framework/core/Environment.h>
18 #include <framework/core/ModuleManager.h>
20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
26 #include <sys/prctl.h>
35 using namespace boost::python;
39 static int g_signalReceived = 0;
41 static void storeSignal(
int signalNumber)
43 if (signalNumber == SIGINT) {
48 if (g_signalReceived == 0) {
49 g_signalReceived = signalNumber;
56 for (
auto& socket : m_sockets) {
61 if (not waitForConformation) {
65 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
66 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
68 B2FATAL(
"Did not receive a confirmation message!");
75 m_sockets.reserve(outputAddresses.size());
76 for (
const auto& address : outputAddresses) {
77 m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address,
false));
83 using namespace std::chrono_literals;
85 m_moduleList = path->buildModulePathList();
88 B2ASSERT(
"You try to process an empty path!", not m_moduleList.empty());
89 for (
const auto& module : m_moduleList) {
92 if (hasParallelFlag and module->hasCondition()) {
93 for (
const auto& conditionPath : module->getAllConditionPaths()) {
95 hasParallelFlag =
false;
99 B2ASSERT(
"Module with name " << module->getName() <<
" does not have parallel flag!", hasParallelFlag);
103 installMainSignalHandlers();
104 processInitialize(m_moduleList);
108 B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). "
109 "You must add the specific HLT module as first module to the path.");
114 if (numLogError != 0) {
115 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
120 runWorkers(path, numProcesses);
122 installMainSignalHandlers(storeSignal);
124 int numberOfRestartedWorkers = 0;
128 if (g_signalReceived > 0) {
129 B2WARNING(
"Received a signal to go down.");
134 unsigned int presentWorkers;
135 unsigned int neededWorkers;
137 std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
138 if (neededWorkers > 0) {
139 if (restartFailedWorkers) {
140 runWorkers(path, neededWorkers);
141 numberOfRestartedWorkers += neededWorkers;
143 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
146 }
else if (presentWorkers == 0) {
147 B2DEBUG(10,
"All workers have cleanly exited. Will now also exit");
151 if (numberOfRestartedWorkers > numProcesses) {
152 B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers <<
", which I think is abnormal. "
153 "Will terminate the process now!");
157 std::this_thread::sleep_for(10ms);
160 checkChildProcesses();
163 std::this_thread::sleep_for(500ms);
165 for (
const int& pid : m_processList) {
166 if (kill(pid, SIGKILL) >= 0) {
167 B2WARNING(
"Needed to hard kill process " << pid);
169 B2DEBUG(100,
"no process " << pid <<
" found, already gone?");
171 sendTerminatedMessage(pid,
false);
173 m_processList.clear();
175 B2DEBUG(10,
"Done here");
179 if (g_signalReceived == SIGINT) {
180 installSignalHandler(SIGINT, SIG_DFL);
187 for (
unsigned int i = 0; i < numProcesses; i++) {
190 B2DEBUG(10,
"Starting a new worker process");
195 installMainSignalHandlers(storeSignal);
200 gROOT->GetListOfFiles()->Delete();
205 if (m_eventMetaDataPtr)
206 B2ERROR(
"Exception occured in exp/run/evt: "
207 << m_eventMetaDataPtr->getExperiment() <<
" / "
208 << m_eventMetaDataPtr->getRun() <<
" / "
209 << m_eventMetaDataPtr->getEvent());
213 B2DEBUG(10,
"Ending a worker process here.");
222 bool terminationRequested =
false;
223 bool firstRound =
true;
229 m_previousEventMetaData.setEndOfData();
231 while (not terminationRequested) {
232 B2DEBUG(100,
"Processing new event");
235 m_processStatisticsPtr->startGlobal();
239 terminationRequested = processEvent(moduleIter, firstRound);
252 B2DEBUG(10,
"Calling terminate");
253 m_eventMetaDataPtr.create();
254 processTerminate(m_moduleList);
259 while (not moduleIter.
isDone()) {
261 B2DEBUG(10,
"Starting event of " << module->getName());
264 if (module != m_master) {
270 if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
271 (*m_eventMetaDataPtr != m_previousEventMetaData)) {
272 B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() <<
" and "
273 << module->getName());
276 if (not firstRound) {
285 if (g_signalReceived != 0) {
286 if (g_signalReceived != SIGINT) {
289 B2DEBUG(10,
"Received a SIGINT in the worker process...");
294 B2ASSERT(
"The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
296 if (m_eventMetaDataPtr->isEndOfData()) {
301 if (module == m_master and not firstRound) {
302 if (m_eventMetaDataPtr->isEndOfRun()) {
303 B2DEBUG(10,
"Calling endRun()");
305 m_processStatisticsPtr->suspendGlobal();
308 m_processStatisticsPtr->resumeGlobal();
311 m_previousEventMetaData = *m_eventMetaDataPtr;
315 }
else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
317 m_processStatisticsPtr->suspendGlobal();
319 m_processStatisticsPtr->resumeGlobal();
322 const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
323 (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
324 const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
325 and not m_previousEventMetaData.isEndOfRun();
326 if (runChangedWithoutNotice) {
327 m_processStatisticsPtr->suspendGlobal();
333 m_processStatisticsPtr->resumeGlobal();
343 m_previousEventMetaData = *m_eventMetaDataPtr;
347 if (module->evalCondition()) {
348 PathPtr condPath = module->getConditionPath();
364 unsigned int needToRestart = 0;
367 for (
auto iter = m_processList.begin(); iter != m_processList.end();) {
368 const auto& pid = *iter;
372 const int result = waitpid(pid, &status, WNOHANG);
374 if (errno == EINTR) {
379 B2FATAL(
"waitpid() failed.");
381 }
else if (result == 0) {
387 B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
390 const auto exitCode = WEXITSTATUS(status);
394 B2WARNING(
"A worker process has died unexpected!");
397 sendTerminatedMessage(pid,
true);
401 iter = m_processList.erase(iter);
404 return {m_processList.size(), needToRestart};
409 for (
auto& socket : m_sockets) {
423 m_processList.push_back(pid);
425 }
else if (pid < 0) {
426 B2FATAL(
"fork() failed: " << strerror(errno));
434 prctl(PR_SET_PDEATHSIG, SIGHUP);
441 void process(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false)
443 static bool already_executed =
false;
444 B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
447 B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
449 namespace py = boost::python;
450 std::vector<std::string> outputAddressesAsString;
451 size_t nList = py::len(outputAddresses);
452 for (
size_t iList = 0; iList < nList; ++iList) {
453 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
461 already_executed =
true;
464 processor.process(startPath, restartFailedWorkers);
467 }
catch (std::exception& e) {
468 B2ERROR(
"Uncaught exception encountered: " << e.what());
472 B2ERROR(
"Uncaught exception encountered!");
478 BOOST_PYTHON_MODULE(hbasf2)
480 def(
"process", &process);
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void process(PathPtr spath, bool restartFailedWorkers)
Process the given path.
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void runWorkers(PathPtr path, unsigned int numProcesses)
Fork out as much workers as requested and in each run the given path using processCore.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
@ c_Error
Error: for things that went wrong and have to be fixed.
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
void reset(bool keepEntries=false)
Invalidate all payloads.
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Abstract base class for different kinds of events.