Belle II Software development
ZMQEventProcessor.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/core/Module.h>
11#include <framework/core/EventProcessor.h>
12#include <framework/core/Path.h>
13#include <framework/pcore/ProcessMonitor.h>
14
15namespace Belle2 {
24 public:
25
28
30 virtual ~ZMQEventProcessor();
31
33 void process(const PathPtr& spath, long maxEvent);
34
36 void cleanup();
37
38 private:
40 void initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager);
41
43 void forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
44 const ModulePtrList& terminateGlobally);
45
47 void terminateAndCleanup(const ModulePtr& histogramManager);
48
50 void runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally, long maxEvent);
51
53 void runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent);
54
56 void runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent);
57
59 void runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
60 long maxEvent);
61
63 void processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent);
64
66 void processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent = 0, bool isInputProcess = true,
67 bool isWorkerProcess = false, bool isOutputProcess = false);
68
70 bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker = false, bool output = false);
71
73 void processBeginRun(bool skipDB = false);
74
76 void processEndRun();
77
80
83
84 };
85
87}
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
provides the core event processing loop.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
Class to monitor all started framework processes (input, workers, output), kill them if requested and...
This class provides the core event processing loop for parallel processing with ZMQ.
ProcessMonitor m_processMonitor
Instance of the process monitor.
void processEndRun()
Calls EndRun function.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void processBeginRun(bool skipDB=false)
Calls BeginRun function.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
ZMQEventProcessor()
Init the socket cleaning at exit.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
Abstract base class for different kinds of events.