Belle II Software development
ZMQCollector.h
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#pragma once
9
10#include <daq/hbasf2/apps/ZMQApp.h>
11#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
12#include <framework/pcore/zmq/connections/ZMQLoadBalancedConnection.h>
13#include <framework/pcore/zmq/connections/ZMQRawConnection.h>
14#include <daq/hbasf2/connections/ZMQROIConnection.h>
15
16#include <boost/program_options.hpp>
17
18namespace po = boost::program_options;
19
20namespace Belle2 {
32 class ZMQCollector : public ZMQStandardApp<ZMQConfirmedInput, ZMQLoadBalancedOutput> {
33 protected:
35 void initialize() final;
37 void addOptions(po::options_description& desc) final;
39 void handleExternalSignal(EMessageTypes type) final;
41 void handleInput() final;
43 void handleTimeout() final;
44
45 private:
47 std::string m_inputAddress;
49 std::string m_outputAddress;
51 bool m_lax = false;
53 unsigned int m_stopWaitingTime = 2;
54 };
55
62 class ZMQOutputAdapter : public ZMQStandardApp<ZMQLoadBalancedInput, ZMQRawOutput> {
63 protected:
65 void initialize() final;
67 void addOptions(po::options_description& desc) final;
69 void handleExternalSignal(EMessageTypes type) final;
71 void handleInput() final;
72
73 private:
75 std::string m_inputAddress;
77 std::string m_outputAddress;
78 };
79
87 class ZMQProxyCollector : public ZMQStandardApp<ZMQConfirmedInput, ZMQConfirmedOutput> {
88 protected:
90 void initialize() final;
92 void addOptions(po::options_description& desc) final;
94 void handleExternalSignal(EMessageTypes type) final;
96 void handleInput() final;
98 void handleTimeout() final;
99
100 private:
102 std::string m_inputAddress;
104 std::string m_outputAddress;
106 unsigned int m_stopWaitingTime = 2;
107 };
108
117 class ZMQFinalCollector : public ZMQStandardApp<ZMQConfirmedInput, ZMQRawOutput> {
118 protected:
120 void initialize() final;
122 void addOptions(po::options_description& desc) final;
124 void handleExternalSignal(EMessageTypes type) final;
126 void handleInput() final;
128 void handleTimeout() final;
129
130 private:
132 std::string m_inputAddress;
134 std::string m_outputAddress;
138 unsigned int m_stopWaitingTime = 2;
139 };
140
142 class ZMQFinalCollectorWithROI : public ZMQStandardApp<ZMQConfirmedInput, ZMQDataAndROIOutput> {
143 protected:
145 void initialize() final;
147 void addOptions(po::options_description& desc) final;
149 void handleExternalSignal(EMessageTypes type) final;
151 void handleInput() final;
153 void fillMonitoringJSON(std::stringstream& buffer) const final;
155 void handleTimeout() final;
156
157 private:
159 std::string m_inputAddress;
167 unsigned int m_stopWaitingTime = 2;
168 };
169
170}
Normal collector app: receive messages on the input reacting with a confirmation message and sends th...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Special form of the ZMQFinalCollector for sending out the additional data message to a ROI receiver.
std::string m_inputAddress
Parameter: input address.
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
Final collector app: receive messages on the input reacting with a confirmation message and sends the...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Special collector app for translating between ZMQ and raw connections: send ready messages (like a ty...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Special collector app: receive messages on the input reacting with a confirmation message and sends t...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Generic base class for all standalone ZMQ applications.
Definition ZMQApp.h:46
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.