Belle II Software development
HLTEventProcessor.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <framework/core/Module.h>
11#include <framework/core/EventProcessor.h>
12#include <framework/core/Path.h>
13#include <framework/pcore/zmq/utils/ZMQParent.h>
14
15namespace Belle2 {
38 public:
40 HLTEventProcessor(const std::vector<std::string>& outputAddresses);
41
51 void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName = false);
52
53 private:
57 std::vector<int> m_processList;
59 std::vector<std::unique_ptr<zmq::socket_t>> m_sockets;
60
62 void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation);
64 void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName = false);
70 void processCore(PathPtr path);
71
86 bool processEvent(PathIterator moduleIter, bool firstRound);
87
96 std::pair<unsigned int, unsigned int> checkChildProcesses();
97
99 void release();
100
102 bool forkOut();
103 };
105}
provides the core event processing loop.
EventProcessor to be used on the HLT with all specialities of the HLT processing:
std::vector< std::unique_ptr< zmq::socket_t > > m_sockets
The created sockets for unregistering workers. TODO: use connections.
bool forkOut()
Helper function to fork out. Sets the Python state correctly and adds the process to the internal sta...
void sendTerminatedMessage(unsigned int pid, bool waitForConfirmation)
Send an unregister message to all sockets if the given PID died. Wait at max 10s for the confirmation...
void release()
Release the parent resource, which is needed after forking to not close it twice.
ZMQParent m_parent
An instance of a ZMQParent to create sockets for unregistering workers.
void runWorkers(PathPtr path, unsigned int numProcesses, bool appendProcessNumberToModuleName=false)
Fork out as much workers as requested and in each run the given path using processCore.
std::vector< int > m_processList
The current list of running processes (with their PIDs)
std::pair< unsigned int, unsigned int > checkChildProcesses()
Check if one of the started processes has died.
bool processEvent(PathIterator moduleIter, bool firstRound)
Process a single event by iterating through the module path once.
void process(PathPtr spath, bool restartFailedWorkers, bool appendProcessNumberToModuleName=false)
Process the given path.
void processCore(PathPtr path)
Process the path by basically calling processEvent until a termination is requested.
Iterator over a Path (returning Module pointers).
Definition: PathIterator.h:26
A helper class for creating ZMQ sockets keeping track of the ZMQ context and terminating it if needed...
Definition: ZMQParent.h:39
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
Abstract base class for different kinds of events.