Belle II Software  release-05-02-19
HLTEventProcessor.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <framework/core/Module.h>
13 #include <framework/core/EventProcessor.h>
14 #include <framework/core/Path.h>
15 #include <framework/pcore/zmq/utils/ZMQParent.h>
16 
17 namespace Belle2 {
37  class HLTEventProcessor : public EventProcessor {
38  public:
40  HLTEventProcessor(const std::vector<std::string>& outputAddresses);
41 
51  void process(PathPtr spath, bool restartFailedWorkers);
52 
53  private:
55  ZMQParent m_parent;
57  std::vector<int> m_processList;
59  std::vector<std::unique_ptr<zmq::socket_t>> m_sockets;
60 
62  void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation);
64  void runWorkers(PathPtr path, unsigned int numProcesses);
70  void processCore(PathPtr path);
71 
86  bool processEvent(PathIterator moduleIter, bool firstRound);
87 
96  std::pair<unsigned int, unsigned int> checkChildProcesses();
97 
99  void release();
100 
102  bool forkOut();
103  };
105 }
Belle2::HLTEventProcessor::forkOut
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
Definition: HLTEventProcessor.cc:417
Belle2::HLTEventProcessor::processEvent
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
Definition: HLTEventProcessor.cc:259
Belle2::HLTEventProcessor::m_parent
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
Definition: HLTEventProcessor.h:63
Belle2::HLTEventProcessor::m_sockets
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
Definition: HLTEventProcessor.h:67
Belle2::HLTEventProcessor::HLTEventProcessor
HLTEventProcessor(const std::vector< std::string > &outputAddresses)
Create a new event processor and store the ZMQ addresses where to unregister workers.
Definition: HLTEventProcessor.cc:75
Belle2::HLTEventProcessor::release
void release()
Release the parent resource, which is needed after forking to not close it twice.
Definition: HLTEventProcessor.cc:409
Belle2::HLTEventProcessor::process
void process(PathPtr spath, bool restartFailedWorkers)
Process the given path.
Definition: HLTEventProcessor.cc:83
Belle2::HLTEventProcessor::runWorkers
void runWorkers(PathPtr path, unsigned int numProcesses)
Fork out as much workers as requested and in each run the given path using processCore.
Definition: HLTEventProcessor.cc:187
Belle2::HLTEventProcessor::sendTerminatedMessage
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
Definition: HLTEventProcessor.cc:56
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::HLTEventProcessor::m_processList
std::vector< int > m_processList
The current list of running processes (with their PIDs)
Definition: HLTEventProcessor.h:65
Belle2::HLTEventProcessor::processCore
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
Definition: HLTEventProcessor.cc:222
Belle2::HLTEventProcessor::checkChildProcesses
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
Definition: HLTEventProcessor.cc:364