8#include <daq/hbasf2/utils/HLTEventProcessor.h>
10#include <boost/python.hpp>
11#include <framework/utilities/RegisterPythonModule.h>
12#include <framework/core/InputController.h>
13#include <framework/pcore/ProcHandler.h>
15#include <framework/database/DBStore.h>
16#include <framework/core/RandomNumbers.h>
17#include <framework/core/Environment.h>
18#include <framework/core/ModuleManager.h>
20#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
22#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
35using namespace boost::python;
39 static int g_signalReceived = 0;
42 static int g_processNumber = 1;
44 static void storeSignal(
int signalNumber)
46 if (signalNumber == SIGINT) {
51 if (g_signalReceived == 0) {
52 g_signalReceived = signalNumber;
64 if (not waitForConfirmation) {
68 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
71 B2FATAL(
"Did not receive a confirmation message! waitForConfirmation is " << waitForConfirmation);
78 m_sockets.reserve(outputAddresses.size());
79 for (
const auto& address : outputAddresses) {
86 using namespace std::chrono_literals;
91 B2ASSERT(
"You try to process an empty path!", not
m_moduleList.empty());
95 if (hasParallelFlag and module->hasCondition()) {
96 for (
const auto& conditionPath : module->getAllConditionPaths()) {
98 hasParallelFlag =
false;
102 B2ASSERT(
"Module with name " << module->getName() <<
" does not have parallel flag!", hasParallelFlag);
111 B2ERROR(
"There is no module that provides event and run numbers (EventMetaData). "
112 "You must add the specific HLT module as first module to the path.");
117 if (numLogError != 0) {
118 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
123 runWorkers(path, numProcesses, appendProcessNumberToModuleName);
127 int numberOfRestartedWorkers = 0;
131 if (g_signalReceived > 0) {
132 B2WARNING(
"Received a signal to go down.");
137 unsigned int presentWorkers;
138 unsigned int neededWorkers;
141 if (neededWorkers > 0) {
142 if (restartFailedWorkers) {
144 numberOfRestartedWorkers += neededWorkers;
146 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
149 }
else if (presentWorkers == 0) {
150 B2DEBUG(10,
"All workers have cleanly exited. Will now also exit");
154 if (numberOfRestartedWorkers > numProcesses) {
155 B2ERROR(
"I needed to restart on total " << numberOfRestartedWorkers <<
", which I think is abnormal. "
156 "Will terminate the process now!");
160 std::this_thread::sleep_for(10ms);
163 if (appendProcessNumberToModuleName) {
165 B2INFO(g_processNumber <<
": Send SIGINT to " << pid);
172 if (kill(pid, 0) != 0) {
175 B2DEBUG(10, g_processNumber <<
": Checking process termination, count = " << count);
176 std::this_thread::sleep_for(1000ms);
177 if (count % 5 == 1) kill(pid, SIGINT);
186 std::this_thread::sleep_for(500ms);
189 if (kill(pid, SIGKILL) >= 0) {
190 B2WARNING(
"Needed to hard kill process " << pid);
192 B2DEBUG(100,
"no process " << pid <<
" found, already gone?");
198 B2DEBUG(10,
"Done here");
202 if (g_signalReceived == SIGINT) {
210 for (
unsigned int i = 0; i < numProcesses; i++) {
213 B2DEBUG(10,
"Starting a new worker process");
220 if (appendProcessNumberToModuleName) {
222 module->setName(std::to_string(g_processNumber) + std::string(
"_") + module->getName());
223 B2INFO(
"New worker name is " << module->getName());
229 gROOT->GetListOfFiles()->Delete();
235 B2ERROR(
"Exception occured in exp/run/evt: "
242 B2DEBUG(10,
"Ending a worker process here.");
251 bool terminationRequested =
false;
252 bool firstRound =
true;
260 while (not terminationRequested) {
261 B2DEBUG(100,
"Processing new event");
268 terminationRequested =
processEvent(moduleIter, firstRound);
281 B2DEBUG(10,
"Calling terminate");
288 while (not moduleIter.
isDone()) {
290 B2DEBUG(10,
"Starting event of " << module->getName());
301 B2FATAL(
"Two modules setting EventMetaData were discovered: " <<
m_master->
getName() <<
" and "
302 << module->getName());
305 if (not firstRound) {
314 if (g_signalReceived != 0) {
315 if (g_signalReceived != SIGINT) {
318 B2DEBUG(10,
"Received a SIGINT in the worker process...");
330 if (module ==
m_master and not firstRound) {
332 B2DEBUG(10,
"Calling endRun()");
360 if (runChangedWithoutNotice) {
381 if (module->evalCondition()) {
382 PathPtr condPath = module->getConditionPath();
384 if (module->getAfterConditionPath() == Module::EAfterConditionPath::c_Continue) {
398 unsigned int needToRestart = 0;
402 const auto& pid = *iter;
406 const int result = waitpid(pid, &status, WNOHANG);
408 if (errno == EINTR) {
413 B2FATAL(
"waitpid() failed.");
415 }
else if (result == 0) {
421 B2ASSERT(
"Do not understand the result of waitpid()", result == pid);
424 const auto exitCode = WEXITSTATUS(status);
428 B2WARNING(
"A worker process has died unexpected!");
460 }
else if (pid < 0) {
461 B2FATAL(
"fork() failed: " << strerror(errno));
465 PyOS_AfterFork_Child();
469 prctl(PR_SET_PDEATHSIG, SIGHUP);
476void processNumbered(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false,
477 bool appendProcessNumberToModuleName =
true)
479 static bool already_executed =
false;
480 B2ASSERT(
"Can not run process() on HLT twice per file!", not already_executed);
483 B2ASSERT(
"HLT processing must happen in multiprocessing mode!", environment.getNumberProcesses() > 0);
485 namespace py = boost::python;
486 std::vector<std::string> outputAddressesAsString;
487 size_t nList = py::len(outputAddresses);
488 for (
size_t iList = 0; iList < nList; ++iList) {
489 outputAddressesAsString.emplace_back(py::extract<std::string>(outputAddresses[iList].attr(
"__str__")()));
497 already_executed =
true;
500 processor.process(startPath, restartFailedWorkers, appendProcessNumberToModuleName);
503 }
catch (std::exception& e) {
504 B2ERROR(
"Uncaught exception encountered: " << e.what());
508 B2ERROR(
"Uncaught exception encountered!");
514void process(
PathPtr startPath,
const boost::python::list& outputAddresses,
bool restartFailedWorkers =
false)
516 processNumbered(startPath, outputAddresses, restartFailedWorkers,
false);
519BOOST_PYTHON_MODULE(hbasf2)
521 def(
"processNumbered", &processNumbered);
522 def(
"process", &process);
@ c_Event
Different object in each event, all objects/arrays are invalidated after event() function has been ca...
static DataStore & Instance()
Instance of singleton Store.
void setInitializeActive(bool active)
Setter for m_initializeActive.
void reset(EDurability durability)
Frees memory occupied by data store items and removes all objects from the map.
void invalidateData(EDurability durability)
Clears all registered StoreEntry objects of a specified durability, invalidating all objects.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
Exception thrown when execution is stopped by a signal.
void processEndRun()
Calls the end run methods of all modules.
void processInitialize(const ModulePtrList &modulePathList, bool setEventInfo=true)
Initializes the modules.
bool m_inRun
Are we currently in a run? If yes, processEndRun() needs to do something.
void processBeginRun(bool skipDB=false)
Calls the begin run methods of all modules.
static void installSignalHandler(int sig, void(*fn)(int))
Install a signal handler 'fn' for given signal.
StoreObjPtr< ProcessStatistics > m_processStatisticsPtr
Also used in a number of places.
void callEvent(Module *module)
Calls event() on one single module, setting up logging and statistics as needed.
void processTerminate(const ModulePtrList &modulePathList)
Terminates the modules.
static void writeToStdErr(const char msg[])
async-safe method to write something to STDERR.
const Module * m_master
The master module that determines the experiment/run/event number.
StoreObjPtr< EventMetaData > m_eventMetaDataPtr
EventMetaData is used by processEvent()/processCore().
ModulePtrList m_moduleList
List of all modules in order initialized.
EventMetaData m_previousEventMetaData
Stores state of EventMetaData before it was last changed.
static void installMainSignalHandlers(void(*fn)(int)=nullptr)
Install signal handler for INT, TERM and QUIT signals.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::vector< int > m_processList
The current list of running processes (with their PIDs)
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
@ c_Error
Error: for things that went wrong and have to be fixed.
void resetMessageCounter()
Resets the message counter and error log by setting all message counts to 0.
int getMessageCounter(LogConfig::ELogLevel logLevel) const
Returns the number of logging calls per log level.
static LogSystem & Instance()
Static method to get a reference to the LogSystem instance.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Event
Counting time/calls in event()
const std::string & getName() const
Returns the name of the module.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Iterator over a Path (returning Module pointers).
bool isDone() const
Are we finished iterating?
Module * get() const
dereference.
static void setProcessID(int processID)
Set the process ID of this process.
static void useEventDependent()
Set Event dependent Random Generator as current one.
static void initializeEvent(bool force=false)
Initialize event information.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
static std::string createIdentity(unsigned int pid=0)
Create a unique ZMQ identity in the form <hostname>_<pid> (if pid is 0, use the current processes PID...
void reset()
Expert function: Reset the parent without context closing. ATTENTION: which will not clean up properl...
void reset(bool keepEntries=false)
Invalidate all payloads.
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
static DBStore & Instance()
Instance of a singleton DBStore.
#define REGISTER_PYTHON_MODULE(moduleName)
Register a python module to make available when loading the library.
std::unique_ptr< zmq::socket_t > createSocket(const std::string &socketAddress, bool bind)
Create a socket of the given type with the given address and bind or not bind it.
void updateEvent()
Updates all intra-run dependent objects.
Abstract base class for different kinds of events.