11 #include <framework/pcore/ProcHelper.h>
12 #include <framework/pcore/GlobalProcHandler.h>
13 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
14 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
15 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
16 #include <framework/pcore/PathUtils.h>
18 #include <framework/pcore/ZMQEventProcessor.h>
19 #include <framework/pcore/DataStoreStreamer.h>
20 #include <framework/pcore/RbTuple.h>
22 #include <framework/core/Environment.h>
23 #include <framework/logging/LogSystem.h>
42 static int g_signalReceived = 0;
48 static void cleanupAndRaiseSignal(
int signalNumber)
50 if (g_eventProcessorForSignalHandling) {
51 g_eventProcessorForSignalHandling->
cleanup();
54 signal(signalNumber, SIG_DFL);
58 static void storeSignal(
int signalNumber)
60 if (signalNumber == SIGINT) {
61 EventProcessor::writeToStdErr(
"\nStopping basf2 gracefully...\n");
65 if (g_signalReceived == 0) {
66 g_signalReceived = signalNumber;
71 std::string g_socketAddress =
"";
73 void deleteSocketFiles()
75 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
80 const auto seperatorPos = g_socketAddress.find(
"://");
82 if (seperatorPos == std::string::npos or seperatorPos + 3 >= g_socketAddress.size()) {
86 const std::string
filename(g_socketAddress.substr(seperatorPos + 3));
89 for (
const auto socketAdressType : socketAddressList) {
90 const std::string socketAddress(ZMQAddressUtils::getSocketAddress(filename, socketAdressType));
91 if (stat(socketAddress.c_str(), &buffer) == 0) {
92 remove(socketAddress.c_str());
98 ZMQEventProcessor::ZMQEventProcessor()
100 B2ASSERT(
"You are having two instances of the ZMQEventProcessor running! This is not possible",
101 not g_eventProcessorForSignalHandling);
102 g_eventProcessorForSignalHandling =
this;
105 g_socketAddress = Environment::Instance().getZMQSocketAddress();
106 std::atexit(deleteSocketFiles);
109 ZMQEventProcessor::~ZMQEventProcessor()
112 g_eventProcessorForSignalHandling =
nullptr;
115 void ZMQEventProcessor::process(
const PathPtr& path,
long maxEvent)
123 if (path->isEmpty()) {
127 const int numProcesses = Environment::Instance().getNumberProcesses();
128 if (numProcesses == 0) {
129 B2FATAL(
"ZMQEventProcessor::process() called for serial processing! Most likely a bug in Framework.");
133 PathPtr inputPath, mainPath, outputPath;
134 std::tie(inputPath, mainPath, outputPath) = PathUtils::splitPath(path);
135 const ModulePtr& histogramManager = PathUtils::getHistogramManager(inputPath, mainPath, outputPath);
137 if (not mainPath or mainPath->isEmpty()) {
138 B2WARNING(
"Cannot run any modules in parallel (no c_ParallelProcessingCertified flag), falling back to single-core mode.");
139 EventProcessor::process(path, maxEvent);
144 const ModulePtrList& moduleList = PathUtils::preparePaths(inputPath, mainPath, outputPath);
146 B2DEBUG(10,
"Initialisation phase");
148 initialize(moduleList, histogramManager);
150 B2DEBUG(10,
"Main phase");
152 const ModulePtrList& terminateGlobally = PathUtils::getTerminateGloballyModules(moduleList);
153 forkAndRun(maxEvent, inputPath, mainPath, outputPath, terminateGlobally);
155 B2DEBUG(10,
"Terminate phase");
156 installMainSignalHandlers(cleanupAndRaiseSignal);
158 terminateAndCleanup(histogramManager);
163 if (histogramManager) {
164 histogramManager->initialize();
167 processInitialize(moduleList,
true);
171 B2ERROR(
"There is no module that provides event and run numbers. You must either add the EventInfoSetter module to your path, or, if using an input module, read EventMetaData objects from file.");
175 int numLogError = LogSystem::Instance().getMessageCounter(LogConfig::c_Error);
176 if (numLogError != 0) {
177 B2FATAL(numLogError <<
" ERROR(S) occurred! The processing of events will not be started.");
188 gROOT->GetListOfFiles()->Clear(
"nodelete");
191 void ZMQEventProcessor::terminateAndCleanup(
const ModulePtr& histogramManager)
195 if (histogramManager) {
196 B2INFO(
"HistoManager:: adding histogram files");
197 RbTupleManager::Instance().hadd();
201 if (g_signalReceived) {
202 if (g_signalReceived == SIGINT) {
203 B2RESULT(
"Processing aborted via signal " << g_signalReceived <<
204 ", terminating. Output files have been closed safely and should be readable.");
206 B2ERROR(
"Processing aborted via signal " << g_signalReceived <<
207 ", terminating. Output files have been closed safely and should be readable.");
210 installSignalHandler(g_signalReceived, SIG_DFL);
211 raise(g_signalReceived);
215 void ZMQEventProcessor::runInput(
const PathPtr& inputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
217 if (not inputPath or inputPath->isEmpty()) {
221 if (not GlobalProcHandler::startInputProcess()) {
223 DataStore::Instance().invalidateData(DataStore::c_Event);
228 installMainSignalHandlers(SIG_IGN);
230 m_processMonitor.reset();
231 DataStoreStreamer::removeSideEffects();
233 processPath(inputPath, terminateGlobally, maxEvent);
234 B2DEBUG(10,
"Finished an input process");
238 void ZMQEventProcessor::runOutput(
const PathPtr& outputPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
240 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
241 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
242 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
244 if (not outputPath or outputPath->isEmpty()) {
248 if (not GlobalProcHandler::startOutputProcess()) {
253 installMainSignalHandlers(SIG_IGN);
255 m_processMonitor.reset();
258 m_master = outputPath->getModules().begin()->get();
260 processPath(outputPath, terminateGlobally, maxEvent);
268 zmqClient.
initialize(pubSocketAddress, subSocketAddress);
271 const auto& evtMessage = streamer.
stream();
272 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_statisticMessage, evtMessage);
273 zmqClient.
publish(std::move(message));
275 B2DEBUG(10,
"Finished an output process");
278 void ZMQEventProcessor::runWorker(
unsigned int numProcesses,
const PathPtr& inputPath,
const PathPtr& mainPath,
281 if (numProcesses == 0) {
285 if (not GlobalProcHandler::startWorkerProcesses(numProcesses)) {
287 m_processMonitor.waitForRunningWorker(60);
292 installMainSignalHandlers(SIG_IGN);
294 if (inputPath and not inputPath->isEmpty()) {
296 m_master = mainPath->getModules().begin()->get();
299 m_processMonitor.reset();
300 DataStoreStreamer::removeSideEffects();
302 processPath(mainPath, terminateGlobally, maxEvent);
303 B2DEBUG(10,
"Finished a worker process");
307 void ZMQEventProcessor::processPath(
const PathPtr& localPath,
const ModulePtrList& terminateGlobally,
long maxEvent)
309 ModulePtrList localModules = localPath->buildModulePathList();
310 maxEvent = getMaximumEventNumber(maxEvent);
312 processCore(localPath, localModules, maxEvent, GlobalProcHandler::isProcess(ProcType::c_Input));
314 B2DEBUG(10,
"terminate process...");
315 PathUtils::prependModulesIfNotPresent(&localModules, terminateGlobally);
316 processTerminate(localModules);
323 if (not GlobalProcHandler::startMonitoringProcess()) {
327 const auto& environment = Environment::Instance();
329 B2DEBUG(10,
"Will now start process monitor...");
330 const int numProcesses = environment.getNumberProcesses();
331 m_processMonitor.initialize(numProcesses);
334 m_processMonitor.waitForRunningInput(60);
335 if (m_processMonitor.hasEnded()) {
339 m_processMonitor.waitForRunningOutput(60);
340 if (m_processMonitor.hasEnded()) {
344 installMainSignalHandlers(storeSignal);
347 runWorker(m_processMonitor.needMoreWorkers(), inputPath, mainPath, terminateGlobally, maxEvent);
349 const auto& restartFailedWorkers = environment.getZMQRestartFailedWorkers();
350 const auto& failOnFailedWorkers = environment.getZMQFailOnFailedWorkers();
352 B2DEBUG(10,
"Will now start main loop...");
355 m_processMonitor.checkMulticast();
357 m_processMonitor.checkChildProcesses();
359 m_processMonitor.checkSignals(g_signalReceived);
362 if (m_processMonitor.hasEnded()) {
367 const unsigned int neededWorkers = m_processMonitor.needMoreWorkers();
368 if (neededWorkers > 0) {
369 if (restartFailedWorkers) {
370 runWorker(neededWorkers, inputPath, mainPath, terminateGlobally, maxEvent);
371 }
else if (failOnFailedWorkers) {
372 B2ERROR(
"A worker failed. Will try to end the process smoothly now.");
374 }
else if (not m_processMonitor.hasWorkers()) {
375 B2WARNING(
"All workers have died and you did not request to restart them. Going down now.");
381 B2DEBUG(10,
"Finished the monitoring process");
384 void ZMQEventProcessor::forkAndRun(
long maxEvent,
const PathPtr& inputPath,
const PathPtr& mainPath,
const PathPtr& outputPath,
387 const int numProcesses = Environment::Instance().getNumberProcesses();
388 GlobalProcHandler::initialize(numProcesses);
390 const auto& socketAddress = Environment::Instance().getZMQSocketAddress();
392 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_pub));
393 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress,
ZMQAddressType::c_sub));
398 installMainSignalHandlers(cleanupAndRaiseSignal);
399 m_processMonitor.subscribe(pubSocketAddress, subSocketAddress, controlSocketAddress);
401 B2DEBUG(10,
"Starting input process...");
402 runInput(inputPath, terminateGlobally, maxEvent);
403 B2DEBUG(10,
"Starting output process...");
404 runOutput(outputPath, terminateGlobally, maxEvent);
406 B2DEBUG(10,
"Starting monitoring process...");
407 runMonitoring(inputPath, mainPath, terminateGlobally, maxEvent);
410 void ZMQEventProcessor::cleanup()
412 if (not GlobalProcHandler::isProcess(ProcType::c_Monitor) and not GlobalProcHandler::isProcess(ProcType::c_Init)) {
413 B2DEBUG(10,
"Not running cleanup, as I am in process type " << GlobalProcHandler::getProcessName());
416 m_processMonitor.killProcesses(5);
417 m_processMonitor.terminate();