Belle II Software  release-05-01-25
ZMQDistributor.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <daq/hbasf2/apps/ZMQApp.h>
13 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
14 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
15 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
16 
17 #include <boost/program_options.hpp>
18 
19 namespace po = boost::program_options;
20 
21 namespace Belle2 {
35  class ZMQDistributor : public ZMQStandardApp<ZMQRawInput, ZMQLoadBalancedOutput> {
36  protected:
38  void initialize() final;
40  void addOptions(po::options_description& desc) final;
42  void handleExternalSignal(EMessageTypes type) final;
44  void handleInput() final;
46  void handleTimeout() final;
47 
48  private:
50  std::string m_inputAddress;
52  std::string m_outputAddress;
54  unsigned int m_maximalBufferSize = 80'000'000;
56  bool m_expressRecoMode = false;
58  unsigned int m_stopWaitingTime = 2;
59  };
60 
62  protected:
64  void initialize() final;
66  void addOptions(po::options_description& desc) final;
68  void handleExternalSignal(EMessageTypes type) final;
70  void handleInput() final;
72  void handleTimeout() final;
73 
74  private:
76  std::string m_inputAddress;
78  std::string m_outputAddress;
80  unsigned int m_maximalBufferSize = 80'000'000;
82  bool m_expressRecoMode = false;
84  unsigned int m_stopWaitingTime = 2;
85  };
87 }
Belle2::ZMQDistributor::m_maximalBufferSize
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
Definition: ZMQDistributor.h:62
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQDistributor::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQDistributor.cc:55
Belle2::ZMQStandardApp
Generic base class for all standalone ZMQ applications.
Definition: ZMQApp.h:56
Belle2::ZMQRawInput
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
Definition: ZMQRawConnection.h:51
Belle2::ZMQDistributor::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQDistributor.h:66
Belle2::ZMQDistributor::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQDistributor.cc:33
Belle2::ZMQInputAdapter
Definition: ZMQDistributor.h:69
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQDistributor::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
Definition: ZMQDistributor.cc:40
Belle2::ZMQDistributor::m_expressRecoMode
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQDistributor.h:64
Belle2::ZMQDistributor::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Definition: ZMQDistributor.cc:64
Belle2::ZMQDistributor::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQDistributor.h:60
Belle2::ZMQDistributor::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQDistributor.h:58
Belle2::ZMQDistributor::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQDistributor.cc:15