Belle II Software  release-08-01-10
ZMQDistributor.h
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #pragma once
9 
10 #include <daq/hbasf2/apps/ZMQApp.h>
11 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
12 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
13 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
14 
15 #include <boost/program_options.hpp>
16 
17 namespace po = boost::program_options;
18 
19 namespace Belle2 {
33  class ZMQDistributor : public ZMQStandardApp<ZMQRawInput, ZMQLoadBalancedOutput> {
34  protected:
36  void initialize() final;
38  void addOptions(po::options_description& desc) final;
40  void handleExternalSignal(EMessageTypes type) final;
42  void handleInput() final;
44  void handleTimeout() final;
45 
46  private:
48  std::string m_inputAddress;
50  std::string m_outputAddress;
52  unsigned int m_maximalBufferSize = 80'000'000;
54  bool m_expressRecoMode = false;
56  unsigned int m_stopWaitingTime = 2;
57  };
58 
59  class ZMQInputAdapter : public ZMQStandardApp<ZMQRawInput, ZMQConfirmedOutput> {
60  protected:
62  void initialize() final;
64  void addOptions(po::options_description& desc) final;
66  void handleExternalSignal(EMessageTypes type) final;
68  void handleInput() final;
70  void handleTimeout() final;
71 
72  private:
74  std::string m_inputAddress;
76  std::string m_outputAddress;
78  unsigned int m_maximalBufferSize = 80'000'000;
80  bool m_expressRecoMode = false;
82  unsigned int m_stopWaitingTime = 2;
83  };
85 }
Standard distributor app: receive data via a raw connection (e.g.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Generic base class for all standalone ZMQ applications.
Definition: ZMQApp.h:46
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.