Belle II Software  release-08-01-10
ZMQEventProcessor.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <framework/core/Module.h>
11 #include <framework/core/EventProcessor.h>
12 #include <framework/core/Path.h>
13 #include <framework/pcore/ProcessMonitor.h>
14 
15 namespace Belle2 {
24  public:
25 
28 
30  virtual ~ZMQEventProcessor();
31 
33  void process(const PathPtr& spath, long maxEvent);
34 
36  void cleanup();
37 
38  private:
40  void initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager);
41 
43  void forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
44  const ModulePtrList& terminateGlobally);
45 
47  void terminateAndCleanup(const ModulePtr& histogramManager);
48 
50  void runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally, long maxEvent);
51 
53  void runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent);
54 
56  void runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent);
57 
59  void runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
60  long maxEvent);
61 
63  void processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent);
64 
66  void processCore(const PathPtr& startPath, const ModulePtrList& modulePathList, long maxEvent = 0, bool isInputProcess = true,
67  bool isWorkerProcess = false, bool isOutputProcess = false);
68 
70  bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker = false, bool output = false);
71 
73  void processBeginRun(bool skipDB = false);
74 
76  void processEndRun();
77 
80 
83 
84  };
85 
87 }
Store event, run, and experiment numbers.
Definition: EventMetaData.h:33
provides the core event processing loop.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
Class to monitor all started framework processes (input, workers, output), kill them if requested and...
This class provides the core event processing loop for parallel processing with ZMQ.
ProcessMonitor m_processMonitor
Instance of the process monitor.
void processEndRun()
Calls EndRun function.
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
void processBeginRun(bool skipDB=false)
Calls BeginRun function.
void cleanup()
clean up IPC resources (should only be called in one process).
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
bool processEvent(PathIterator moduleIter, bool skipMasterModule, bool Worker=false, bool output=false)
Calls Event function.
void processCore(const PathPtr &startPath, const ModulePtrList &modulePathList, long maxEvent=0, bool isInputProcess=true, bool isWorkerProcess=false, bool isOutputProcess=false)
Process modules in the path.
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
EventMetaData m_previousEventMetaData
Stores previous eventMetaData.
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
ZMQEventProcessor()
Init the socket cleaning at exit.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
Abstract base class for different kinds of events.