Belle II Software  release-05-02-19
ZMQEventProcessor.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Anselm Baur, Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/core/Module.h>
13 #include <framework/core/EventProcessor.h>
14 #include <framework/core/Path.h>
15 #include <framework/pcore/ProcessMonitor.h>
16 
17 namespace Belle2 {
25  class ZMQEventProcessor : public EventProcessor {
26  public:
27 
30 
32  virtual ~ZMQEventProcessor();
33 
35  void process(const PathPtr& spath, long maxEvent);
36 
38  void cleanup();
39 
40  private:
42  void initialize(const ModulePtrList& moduleList, const ModulePtr& histogramManager);
43 
45  void forkAndRun(long maxEvent, const PathPtr& inputPath, const PathPtr& mainPath, const PathPtr& outputPath,
46  const ModulePtrList& terminateGlobally);
47 
49  void terminateAndCleanup(const ModulePtr& histogramManager);
50 
52  void runMonitoring(const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally, long maxEvent);
53 
55  void runInput(const PathPtr& inputPath, const ModulePtrList& terminateGlobally, long maxEvent);
56 
58  void runOutput(const PathPtr& outputPath, const ModulePtrList& terminateGlobally, long maxEvent);
59 
61  void runWorker(unsigned int numProcesses, const PathPtr& inputPath, const PathPtr& mainPath, const ModulePtrList& terminateGlobally,
62  long maxEvent);
63 
65  void processPath(const PathPtr& localPath, const ModulePtrList& terminateGlobally, long maxEvent);
66 
69  };
71 }
Belle2::ProcessMonitor
Class to monitor all started framework processes (input, workers, output), kill them if requested and...
Definition: ProcessMonitor.h:38
Belle2::ZMQEventProcessor::m_processMonitor
ProcessMonitor m_processMonitor
Instance of the process monitor.
Definition: ZMQEventProcessor.h:76
Belle2::ZMQEventProcessor::runInput
void runInput(const PathPtr &inputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the input process.
Definition: ZMQEventProcessor.cc:215
Belle2::ZMQEventProcessor::forkAndRun
void forkAndRun(long maxEvent, const PathPtr &inputPath, const PathPtr &mainPath, const PathPtr &outputPath, const ModulePtrList &terminateGlobally)
Second step in the process: fork out the processes we need to have and call the event loop.
Definition: ZMQEventProcessor.cc:384
Belle2::ZMQEventProcessor::runMonitoring
void runMonitoring(const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Start the monitoring (without forking)
Definition: ZMQEventProcessor.cc:320
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQEventProcessor::process
void process(const PathPtr &spath, long maxEvent)
Processes the full module chain using parallel processing, starting with the first module in the give...
Definition: ZMQEventProcessor.cc:115
Belle2::ModulePtrList
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:586
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::ZMQEventProcessor::cleanup
void cleanup()
clean up IPC resources (should only be called in one process).
Definition: ZMQEventProcessor.cc:410
Belle2::ZMQEventProcessor::initialize
void initialize(const ModulePtrList &moduleList, const ModulePtr &histogramManager)
First step in the process: init the module in the list.
Definition: ZMQEventProcessor.cc:161
Belle2::ModulePtr
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:42
Belle2::ZMQEventProcessor::processPath
void processPath(const PathPtr &localPath, const ModulePtrList &terminateGlobally, long maxEvent)
Basic function run in every process: process the event loop of the given path.
Definition: ZMQEventProcessor.cc:307
Belle2::ZMQEventProcessor::runWorker
void runWorker(unsigned int numProcesses, const PathPtr &inputPath, const PathPtr &mainPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the N worker process.
Definition: ZMQEventProcessor.cc:278
Belle2::ZMQEventProcessor::~ZMQEventProcessor
virtual ~ZMQEventProcessor()
Make sure we remove all sockets cleanly.
Definition: ZMQEventProcessor.cc:109
Belle2::ZMQEventProcessor::terminateAndCleanup
void terminateAndCleanup(const ModulePtr &histogramManager)
Last step in the process: run the termination and cleanup (kill all remaining processes)
Definition: ZMQEventProcessor.cc:191
Belle2::ZMQEventProcessor::ZMQEventProcessor
ZMQEventProcessor()
Init the socket cleaning at exit.
Definition: ZMQEventProcessor.cc:98
Belle2::ZMQEventProcessor::runOutput
void runOutput(const PathPtr &outputPath, const ModulePtrList &terminateGlobally, long maxEvent)
Fork out the output process.
Definition: ZMQEventProcessor.cc:238