8 #include <daq/hbasf2/utils/HLTEventProcessor.h> 
   10 #include <boost/python.hpp> 
   11 #include <framework/utilities/RegisterPythonModule.h> 
   12 #include <framework/core/InputController.h> 
   13 #include <framework/pcore/ProcHandler.h> 
   15 #include <framework/database/DBStore.h> 
   16 #include <framework/core/RandomNumbers.h> 
   17 #include <framework/core/Environment.h> 
   18 #include <framework/core/ModuleManager.h> 
   20 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h> 
   21 #include <framework/pcore/zmq/messages/ZMQDefinitions.h> 
   22 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h> 
   26 #include <sys/prctl.h> 
   35 using namespace boost::python;
 
   39   static int g_signalReceived = 0;
 
   42   static int g_processNumber = 1;
 
   44   static void storeSignal(
int signalNumber)
 
   46     if (signalNumber == SIGINT) {
 
   51     if (g_signalReceived == 0) {
 
   52       g_signalReceived = signalNumber;
 
   59   for (
auto& socket : m_sockets) {
 
   64     if (not waitForConfirmation) {
 
   68       auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
 
   69       B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
 
   71       B2FATAL(
"Did not receive a confirmation message! waitForConfirmation is " << waitForConfirmation);
 
   78   m_sockets.reserve(outputAddresses.size());
 
   79   for (
const auto& address : outputAddresses) {
 
   80     m_sockets.push_back(m_parent.createSocket<ZMQ_DEALER>(address, 
false));
 
   86   using namespace std::chrono_literals;
 
   88   m_moduleList = path->buildModulePathList();
 
   91   B2ASSERT(
"You try to process an empty path!", not m_moduleList.empty());
 
   92   for (
const auto& module : m_moduleList) {
 
   95     if (hasParallelFlag and module->hasCondition()) {
 
   96       for (
const auto& conditionPath : module->getAllConditionPaths()) {
 
   98           hasParallelFlag = 
false;
 
  102     B2ASSERT(
"Module with name " << module->getName() << 
" does not have parallel flag!", hasParallelFlag);
 
  106   installMainSignalHandlers();
 
  107   processInitialize(m_moduleList);
 
  111     B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). " 
  112             "You must add the specific HLT module as first module to the path.");
 
  117   if (numLogError != 0) {
 
  118     B2FATAL(numLogError << 
" ERROR(S) occurred! The processing of events will not be started.");
 
  123   runWorkers(path, numProcesses, appendProcessNumberToModuleName);
 
  125   installMainSignalHandlers(storeSignal);
 
  127   int numberOfRestartedWorkers = 0;
 
  131     if (g_signalReceived > 0) {
 
  132       B2WARNING(
"Received a signal to go down.");
 
  137     unsigned int presentWorkers;
 
  138     unsigned int neededWorkers;
 
  140     std::tie(presentWorkers, neededWorkers) = checkChildProcesses();
 
  141     if (neededWorkers > 0) {
 
  142       if (restartFailedWorkers) {
 
  143         runWorkers(path, neededWorkers);
 
  144         numberOfRestartedWorkers += neededWorkers;
 
  146         B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
 
  149     } 
else if (presentWorkers == 0) {
 
  150       B2DEBUG(10, 
"All workers have cleanly exited. Will now also exit");
 
  154     if (numberOfRestartedWorkers > numProcesses) {
 
  155       B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers << 
", which I think is abnormal. " 
  156               "Will terminate the process now!");
 
  160     std::this_thread::sleep_for(10ms);
 
  163   if (appendProcessNumberToModuleName) {
 
  164     for (
const int& pid : m_processList) {
 
  165       B2INFO(g_processNumber << 
": Send SIGINT to " << pid);
 
  168     for (
const int& pid : m_processList) {
 
  172         if (kill(pid, 0) != 0) {
 
  175         B2DEBUG(10, g_processNumber << 
": Checking process termination, count = " << count);
 
  176         std::this_thread::sleep_for(1000ms);
 
  177         if (count % 5 == 1) kill(pid, SIGINT);
 
  183   checkChildProcesses();
 
  186   std::this_thread::sleep_for(500ms);
 
  188   for (
const int& pid : m_processList) {
 
  189     if (kill(pid, SIGKILL) >= 0) {
 
  190       B2WARNING(
"Needed to hard kill process " << pid);
 
  192       B2DEBUG(100, 
"no process " << pid << 
" found, already gone?");
 
  194     sendTerminatedMessage(pid, 
false);
 
  196   m_processList.clear();
 
  198   B2DEBUG(10, 
"Done here");
 
  202   if (g_signalReceived == SIGINT) {
 
  203     installSignalHandler(SIGINT, SIG_DFL);
 
  210   for (
unsigned int i = 0; i < numProcesses; i++) {
 
  213       B2DEBUG(10, 
"Starting a new worker process");
 
  218       installMainSignalHandlers(storeSignal);
 
  220         if (appendProcessNumberToModuleName) {
 
  221           for (
const auto& module : m_moduleList) {
 
  222             module->setName(std::to_string(g_processNumber) + std::string(
"_") + module->getName());
 
  223             B2INFO(
"New worker name is " << module->getName());
 
  229         gROOT->GetListOfFiles()->Delete();
 
  234         if (m_eventMetaDataPtr)
 
  235           B2ERROR(
"Exception occured in exp/run/evt: " 
  236                   << m_eventMetaDataPtr->getExperiment() << 
" / " 
  237                   << m_eventMetaDataPtr->getRun() << 
" / " 
  238                   << m_eventMetaDataPtr->getEvent());
 
  242       B2DEBUG(10, 
"Ending a worker process here.");
 
  251   bool terminationRequested = 
false;
 
  252   bool firstRound = 
true;
 
  258   m_previousEventMetaData.setEndOfData();
 
  260   while (not terminationRequested) {
 
  261     B2DEBUG(100, 
"Processing new event");
 
  264     m_processStatisticsPtr->startGlobal();
 
  268     terminationRequested = processEvent(moduleIter, firstRound);
 
  281   B2DEBUG(10, 
"Calling terminate");
 
  282   m_eventMetaDataPtr.create();
 
  283   processTerminate(m_moduleList);
 
  288   while (not moduleIter.
isDone()) {
 
  290     B2DEBUG(10, 
"Starting event of " << module->getName());
 
  293     if (module != m_master) {
 
  299       if (not m_previousEventMetaData.isEndOfData() and m_eventMetaDataPtr and
 
  300           (*m_eventMetaDataPtr != m_previousEventMetaData)) {
 
  301         B2FATAL(
"Two modules setting EventMetaData were discovered: " << m_master->getName() << 
" and " 
  302                 << module->getName());
 
  305       if (not firstRound) {
 
  314     if (g_signalReceived != 0) {
 
  315       if (g_signalReceived != SIGINT) {
 
  318         B2DEBUG(10, 
"Received a SIGINT in the worker process...");
 
  323     B2ASSERT(
"The event meta data must always be valid at this stage!", m_eventMetaDataPtr and m_eventMetaDataPtr.isValid());
 
  325     if (m_eventMetaDataPtr->isEndOfData()) {
 
  330     if (module == m_master and not firstRound) {
 
  331       if (m_eventMetaDataPtr->isEndOfRun()) {
 
  332         B2DEBUG(10, 
"Calling endRun()");
 
  334         m_processStatisticsPtr->suspendGlobal();
 
  337         m_processStatisticsPtr->resumeGlobal();
 
  340         m_previousEventMetaData = *m_eventMetaDataPtr;
 
  344       } 
else if (m_previousEventMetaData.isEndOfData() or m_previousEventMetaData.isEndOfRun()) {
 
  347         if (m_eventMetaDataPtr->getRun() != 0) {
 
  348           m_processStatisticsPtr->suspendGlobal();
 
  350           m_processStatisticsPtr->resumeGlobal();
 
  356       const bool runChanged = ((m_eventMetaDataPtr->getExperiment() != m_previousEventMetaData.getExperiment()) or
 
  357                                (m_eventMetaDataPtr->getRun() != m_previousEventMetaData.getRun()));
 
  358       const bool runChangedWithoutNotice = runChanged and not m_previousEventMetaData.isEndOfData()
 
  359                                            and not m_previousEventMetaData.isEndOfRun();
 
  360       if (runChangedWithoutNotice) {
 
  361         m_processStatisticsPtr->suspendGlobal();
 
  367         m_processStatisticsPtr->resumeGlobal();
 
  377       m_previousEventMetaData = *m_eventMetaDataPtr;
 
  381     if (module->evalCondition()) {
 
  382       PathPtr condPath = module->getConditionPath();
 
  398   unsigned int needToRestart = 0;
 
  401   for (
auto iter = m_processList.begin(); iter != m_processList.end();) {
 
  402     const auto& pid = *iter;
 
  406     const int result = waitpid(pid, &status, WNOHANG);
 
  408       if (errno == EINTR) {
 
  413         B2FATAL(
"waitpid() failed.");
 
  415     } 
else if (result == 0) {
 
  421     B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
 
  424     const auto exitCode = WEXITSTATUS(status);
 
  428       B2WARNING(
"A worker process has died unexpected!");
 
  431       sendTerminatedMessage(pid, 
true);
 
  435     iter = m_processList.erase(iter);
 
  438   return {m_processList.size(), needToRestart};
 
  443   for (
auto& socket : m_sockets) {
 
  458     m_processList.push_back(pid);
 
  460   } 
else if (pid < 0) {
 
  461     B2FATAL(
"fork() failed: " << strerror(errno));
 
  465     PyOS_AfterFork_Child();
 
  469     prctl(PR_SET_PDEATHSIG, SIGHUP);
 
  476 void processNumbered(
PathPtr startPath, 
const boost::python::list& outputAddresses, 
bool restartFailedWorkers = 
false,
 
  477                      bool appendProcessNumberToModuleName = 
true)
 
  479   static bool already_executed = 
false;
 
  480   B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
 
  483   B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
 
  485   namespace py = boost::python;
 
  486   std::vector<std::string> outputAddressesAsString;
 
  487   size_t nList = py::len(outputAddresses);
 
  488   for (
size_t iList = 0; iList < nList; ++iList) {
 
  489     outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
 
  497     already_executed = 
true;
 
  500     processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
 
  503   } 
catch (std::exception& e) {
 
  504     B2ERROR(
"Uncaught exception encountered: " << e.what()); 
 
  508     B2ERROR(
"Uncaught exception encountered!"); 
 
  514 void process(
PathPtr startPath, 
const boost::python::list& outputAddresses, 
bool restartFailedWorkers = 
false)
 
  516   processNumbered(startPath, outputAddresses, restartFailedWorkers, 
false);
 
  519 BOOST_PYTHON_MODULE(hbasf2)
 
  521   def(
"processNumbered", &processNumbered);
 
  522   def(
"process", &process);
 
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
@ c_Error
Error: for things that went wrong and have to be fixed.
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
@ c_Continue
After the conditional path, resume execution after this module.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
void reset(bool keepEntries=false)
Invalidate all payloads.
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
void updateEvent()
Updates all intra-run dependent objects.
Abstract base class for different kinds of events.