8#include <daq/hbasf2/apps/ZMQCollector.h>
9#include <framework/logging/Logger.h>
17 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
18 "where to read the events from")
19 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
20 "where to send the events to")
21 (
"lax", boost::program_options::bool_switch(&
m_lax)->default_value(
m_lax),
22 "dismiss events if no worker is ready (lax) or not")
25 "how long to wait after no events come anymore");
37 if (type == EMessageTypes::c_newRunMessage) {
40 }
else if (type == EMessageTypes::c_lastEventMessage) {
48 auto message =
m_input->handleIncomingData();
54 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
64 auto message =
m_input->overwriteStopMessage();
78 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
79 "where to read the events from")
80 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
81 "where to send the events to");
97 auto message =
m_input->handleIncomingData();
102 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
106 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
107 B2DEBUG(30, message->getDataMessage().size());
108 m_output->handleEvent(std::move(message->getDataMessage()));
116 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
117 "where to read the events from")
118 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
119 "where to send the events to")
122 "how long to wait after no events come anymore");
134 if (type == EMessageTypes::c_newRunMessage) {
136 }
else if (type == EMessageTypes::c_lastEventMessage) {
144 auto message =
m_input->handleIncomingData();
150 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
160 auto message =
m_input->overwriteStopMessage();
174 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
175 "where to read the events from")
176 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
177 "where to send the events to")
178 (
"addEventSize", boost::program_options::bool_switch(&
m_addEventSize)->default_value(
false),
179 "add the hlon of the event size at the beginning")
182 "how long to wait after no events come anymore");
196 if (type == EMessageTypes::c_newRunMessage) {
199 }
else if (type == EMessageTypes::c_lastEventMessage) {
207 auto message =
m_input->handleIncomingData();
216 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
220 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
221 m_output->handleEvent(std::move(message->getDataMessage()));
227 m_input->overwriteStopMessage();
238 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
239 "where to read the events from")
241 "where to send the events to")
243 "where to send the rois to")
244 (
"addEventSize", boost::program_options::bool_switch(&
m_addEventSize)->default_value(
false),
245 "add the hlon of the event size at the beginning")
248 "how long to wait after no events come anymore");
262 auto message =
m_input->handleIncomingData();
271 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
280 if (type == EMessageTypes::c_newRunMessage) {
283 }
else if (type == EMessageTypes::c_lastEventMessage) {
291 buffer <<
"{" << std::endl;
292 buffer <<
"\"monitor\": " <<
m_monitor->getMonitoringJSON() <<
"," << std::endl;
293 buffer <<
"\"input\": " <<
m_input->getMonitoringJSON() <<
"," << std::endl;
294 buffer <<
"\"output\": " <<
m_output->getMonitoringJSON() <<
"," << std::endl;
295 buffer <<
"\"roi\": " <<
m_output->getROIMonitoringJSON() << std::endl;
296 buffer <<
"}" << std::endl;
301 m_input->overwriteStopMessage();
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output part of a confirmed connection.
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
std::string m_inputAddress
Parameter: input address.
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Output part of a load-balanced connection.
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.