9 #include <framework/pcore/ProcHelper.h>
10 #include <framework/pcore/GlobalProcHandler.h>
11 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
12 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/PathUtils.h>
16 #include <framework/pcore/ZMQEventProcessor.h>
17 #include <framework/pcore/DataStoreStreamer.h>
18 #include <framework/pcore/RbTuple.h>
20 #include <framework/core/Environment.h>
21 #include <framework/logging/LogSystem.h>
41 static int g_signalReceived = 0;
46 static void cleanupAndRaiseSignal(
int signalNumber)
48 if (g_eventProcessorForSignalHandling) {
49 g_eventProcessorForSignalHandling->
cleanup();
52 signal(signalNumber, SIG_DFL);
56 static void storeSignal(
int signalNumber)
58 if (signalNumber == SIGINT) {
59 EventProcessor::writeToStdErr(
"\nStopping basf2 gracefully...\n");
63 if (g_signalReceived == 0) {
64 g_signalReceived = signalNumber;
69 std::string g_socketAddress =
"";
71 void deleteSocketFiles()
73 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
78 const auto seperatorPos = g_socketAddress.find(
"://");
80 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
84 const std::string filename(g_socketAddress.substr(seperatorPos + 3));
87 for (
const auto socketAdressType : socketAddressList) {
88 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
89 if (stat(socketAddress.c_str(), &buffer) == 0) {
90 remove(socketAddress.c_str());
96 ZMQEventProcessor::ZMQEventProcessor()
98 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
99 not g_eventProcessorForSignalHandling);
100 g_eventProcessorForSignalHandling =
this;
103 g_socketAddress = Environment::Instance().getZMQSocketAddress();
104 std::atexit(deleteSocketFiles);
107 ZMQEventProcessor::~ZMQEventProcessor()
110 g_eventProcessorForSignalHandling =
nullptr;
113 void ZMQEventProcessor::process(
const PathPtr& path,
long maxEvent)
121 if (path->isEmpty()) {
125 const int numProcesses = Environment::Instance().getNumberProcesses();
126 if (numProcesses == 0) {
127 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
131 PathPtr inputPath, mainPath, outputPath;
132 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
133 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
135 if (not mainPath or mainPath->isEmpty()) {
136 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
137 EventProcessor::process(path, maxEvent);
142 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
144 B2DEBUG(10,
"Initialisation phase");
146 initialize(moduleList, histogramManager);
148 B2DEBUG(10,
"Main phase");
150 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
151 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
153 B2DEBUG(10,
"Terminate phase");
154 installMainSignalHandlers(cleanupAndRaiseSignal);
156 terminateAndCleanup(histogramManager);
161 if (histogramManager) {
162 histogramManager->initialize();
165 processInitialize(moduleList,
true);
169 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
173 int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
174 if (numLogError != 0) {
175 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
186 gROOT->GetListOfFiles()->Clear(
"nodelete");
189 void ZMQEventProcessor::terminateAndCleanup(
const ModulePtr& histogramManager)
193 if (histogramManager) {
194 B2INFO(
"HistoManager:: adding histogram files");
195 RbTupleManager::Instance().hadd();
199 if (g_signalReceived) {
200 if (g_signalReceived == SIGINT) {
201 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
202 ", terminating. Output files have been closed safely and should be readable.");
204 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
205 ", terminating. Output files have been closed safely and should be readable.");
208 installSignalHandler(g_signalReceived, SIG_DFL);
209 raise(g_signalReceived);
213 void ZMQEventProcessor::runInput(
const PathPtr& inputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
215 if (not inputPath or inputPath->isEmpty()) {
219 if (not GlobalProcHandler::startInputProcess()) {
221 DataStore::Instance().invalidateData(DataStore::c_Event);
226 installMainSignalHandlers(SIG_IGN);
228 m_processMonitor.reset();
229 DataStoreStreamer::removeSideEffects();
231 processPath(inputPath, terminateGlobally, maxEvent);
232 B2DEBUG(10,
"Finished an input process");
236 void ZMQEventProcessor::runOutput(
const PathPtr& outputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
238 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
239 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
240 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
242 if (not outputPath or outputPath->isEmpty()) {
246 if (not GlobalProcHandler::startOutputProcess()) {
251 installMainSignalHandlers(SIG_IGN);
253 m_processMonitor.reset();
256 m_master = outputPath->getModules().begin()->get();
258 processPath(outputPath, terminateGlobally, maxEvent);
266 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
269 const auto& evtMessage = streamer.
stream();
270 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
271 zmqClient.
publish(std::move(message));
273 B2DEBUG(10,
"Finished an output process");
276 void ZMQEventProcessor::runWorker(
unsigned int numProcesses,
const PathPtr& inputPath,
const PathPtr& mainPath,
279 if (numProcesses == 0) {
283 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
285 m_processMonitor.waitForRunningWorker(60);
290 installMainSignalHandlers(SIG_IGN);
292 if (inputPath and not inputPath->isEmpty()) {
294 m_master = mainPath->getModules().begin()->get();
297 m_processMonitor.reset();
298 DataStoreStreamer::removeSideEffects();
300 processPath(mainPath, terminateGlobally, maxEvent);
301 B2DEBUG(10,
"Finished a worker process");
305 void ZMQEventProcessor::processPath(
const PathPtr& localPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
307 ModulePtrList localModules = localPath->buildModulePathList();
308 maxEvent = getMaximumEventNumber(maxEvent);
310 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
312 B2DEBUG(10,
"terminate process...");
313 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
314 processTerminate(localModules);
321 if (not GlobalProcHandler::startMonitoringProcess()) {
325 const auto& environment = Environment::Instance();
327 B2DEBUG(10,
"Will now start process monitor...");
328 const int numProcesses = environment.getNumberProcesses();
329 m_processMonitor.initialize(numProcesses);
332 m_processMonitor.waitForRunningInput(60);
333 if (m_processMonitor.hasEnded()) {
337 m_processMonitor.waitForRunningOutput(60);
338 if (m_processMonitor.hasEnded()) {
342 installMainSignalHandlers(storeSignal);
345 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
347 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
348 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
350 B2DEBUG(10,
"Will now start main loop...");
353 m_processMonitor.checkMulticast();
355 m_processMonitor.checkChildProcesses();
357 m_processMonitor.checkSignals(g_signalReceived);
360 if (m_processMonitor.hasEnded()) {
365 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
366 if (neededWorkers > 0) {
367 if (restartFailedWorkers) {
368 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
369 }
else if (failOnFailedWorkers) {
370 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
372 }
else if (not m_processMonitor.hasWorkers()) {
373 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
379 B2DEBUG(10,
"Finished the monitoring process");
382 void ZMQEventProcessor::forkAndRun(
long maxEvent,
const PathPtr& inputPath,
const PathPtr& mainPath,
const PathPtr& outputPath,
385 const int numProcesses = Environment::Instance().getNumberProcesses();
386 GlobalProcHandler::initialize(numProcesses);
388 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
390 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
391 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
396 installMainSignalHandlers(cleanupAndRaiseSignal);
397 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
399 B2DEBUG(10,
"Starting input process...");
400 runInput(inputPath, terminateGlobally, maxEvent);
401 B2DEBUG(10,
"Starting output process...");
402 runOutput(outputPath, terminateGlobally, maxEvent);
404 B2DEBUG(10,
"Starting monitoring process...");
405 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
408 void ZMQEventProcessor::cleanup()
410 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
411 B2DEBUG(10,
"Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
414 m_processMonitor.killProcesses(5);
415 m_processMonitor.terminate();
Helper class for data store serialization.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
A helper class for communicating over ZMQ. Includes a multicast and (if needed) also a data socket.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
void publish(AZMQMessage message) const
Publish the message to the multicast.
This class provides the core event processing loop for parallel processing with ZMQ.
void cleanup()
clean up IPC resources (should only be called in one process).
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
@ c_control
Multicast subscribe socket.
Abstract base class for different kinds of events.