Belle II Software  release-05-01-25
ZMQCollector.h
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #pragma once
11 
12 #include <daq/hbasf2/apps/ZMQApp.h>
13 #include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
14 #include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
15 #include <framework/pcore/zmq/connections/ZMQRawConnection.h>
16 #include <daq/hbasf2/connections/ZMQROIConnection.h>
17 
18 #include <boost/program_options.hpp>
19 
20 namespace po = boost::program_options;
21 
22 namespace Belle2 {
34  class ZMQCollector : public ZMQStandardApp<ZMQConfirmedInput, ZMQLoadBalancedOutput> {
35  protected:
37  void initialize() final;
39  void addOptions(po::options_description& desc) final;
41  void handleExternalSignal(EMessageTypes type) final;
43  void handleInput() final;
45  void handleTimeout() final;
46 
47  private:
49  std::string m_inputAddress;
51  std::string m_outputAddress;
53  bool m_lax = false;
55  unsigned int m_stopWaitingTime = 2;
56  };
57 
65  protected:
67  void initialize() final;
69  void addOptions(po::options_description& desc) final;
71  void handleExternalSignal(EMessageTypes type) final;
73  void handleInput() final;
74 
75  private:
77  std::string m_inputAddress;
79  std::string m_outputAddress;
80  };
81 
89  class ZMQProxyCollector : public ZMQStandardApp<ZMQConfirmedInput, ZMQConfirmedOutput> {
90  protected:
92  void initialize() final;
94  void addOptions(po::options_description& desc) final;
96  void handleExternalSignal(EMessageTypes type) final;
98  void handleInput() final;
100  void handleTimeout() final;
101 
102  private:
104  std::string m_inputAddress;
106  std::string m_outputAddress;
108  unsigned int m_stopWaitingTime = 2;
109  };
110 
120  protected:
122  void initialize() final;
124  void addOptions(po::options_description& desc) final;
126  void handleExternalSignal(EMessageTypes type) final;
128  void handleInput() final;
130  void handleTimeout() final;
131 
132  private:
134  std::string m_inputAddress;
136  std::string m_outputAddress;
138  bool m_addEventSize;
140  unsigned int m_stopWaitingTime = 2;
141  };
142 
145  protected:
147  void initialize() final;
149  void addOptions(po::options_description& desc) final;
151  void handleExternalSignal(EMessageTypes type) final;
153  void handleInput() final;
155  void fillMonitoringJSON(std::stringstream& buffer) const final;
157  void handleTimeout() final;
158 
159  private:
161  std::string m_inputAddress;
163  std::string m_dataOutputAddress;
165  std::string m_roiOutputAddress;
167  bool m_addEventSize;
169  unsigned int m_stopWaitingTime = 2;
170  };
172 }
Belle2::ZMQCollector::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQCollector.cc:37
Belle2::ZMQCollector::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQCollector.h:57
Belle2::ZMQCollector::m_lax
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQCollector.h:61
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQStandardApp
Generic base class for all standalone ZMQ applications.
Definition: ZMQApp.h:56
Belle2::ZMQFinalCollector
Final collector app: receive messages on the input reacting with a confirmation message and sends the...
Definition: ZMQCollector.h:127
Belle2::ZMQConfirmedInput
Input part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:65
Belle2::ZMQCollector::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQCollector.h:59
Belle2::ZMQCollector::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQCollector.cc:30
Belle2::ZMQOutputAdapter
Special collector app for translating between ZMQ and raw connections: send ready messages (like a ty...
Definition: ZMQCollector.h:72
Belle2::ZMQCollector::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQCollector.cc:15
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQCollector::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQCollector.cc:64
Belle2::ZMQProxyCollector
Special collector app: receive messages on the input reacting with a confirmation message and sends t...
Definition: ZMQCollector.h:97
Belle2::ZMQCollector::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQCollector.h:63
Belle2::ZMQLoadBalancedInput
Input part of a load-balanced connection.
Definition: ZMQLoadBalancedConnection.h:50
Belle2::ZMQDataAndROIOutput
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
Definition: ZMQROIConnection.h:41
Belle2::ZMQRawOutput
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
Definition: ZMQRawConnection.h:97
Belle2::ZMQFinalCollectorWithROI
Special form of the ZMQFinalCollector for sending out the additional data message to a ROI receiver.
Definition: ZMQCollector.h:152
Belle2::ZMQStandardApp< ZMQConfirmedInput, ZMQLoadBalancedOutput >::fillMonitoringJSON
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
Definition: ZMQApp.details.h:230
Belle2::ZMQCollector::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQCollector.cc:48