8#include <daq/hbasf2/apps/ZMQDistributor.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
17 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
18 "where to read the events from")
19 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
20 "where to send the events to")
22 "express reco mode: send out event messages (instead of raw messages)")
23 (
"lax", boost::program_options::bool_switch(&
m_lax)->default_value(
m_lax),
24 "lax mode: dismiss events if no worker is ready")
27 "size of the input buffer")
30 "how long to wait after no events come anymore");
42 if (type == EMessageTypes::c_newRunMessage) {
45 }
else if (type == EMessageTypes::c_lastEventMessage) {
48 }
else if (type == EMessageTypes::c_terminateMessage) {
66 auto messages =
m_input->handleIncomingData();
68 for (
auto&& message : messages) {
74 messageType = EMessageTypes::c_eventMessage;
87 m_output->handleEvent(std::move(outputMessage));
95 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
96 "where to read the events from")
97 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
98 "where to send the events to")
100 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101 (
"maximalBufferSize",
103 "size of the input buffer")
106 "how long to wait after no events come anymore");
120 if (type == EMessageTypes::c_newRunMessage) {
122 }
else if (type == EMessageTypes::c_lastEventMessage) {
125 }
else if (type == EMessageTypes::c_terminateMessage) {
143 auto messages =
m_input->handleIncomingData();
145 for (
auto&& message : messages) {
151 messageType = EMessageTypes::c_eventMessage;
164 m_output->handleEvent(std::move(outputMessage));
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: send out event messages instead of raw messages.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
bool m_monitorHasPriority
std::shared_ptr< ZMQParent > m_parent
std::unique_ptr< ZMQLoadBalancedOutput > m_output
std::unique_ptr< ZMQRawInput > m_input
void pollEvent(bool pollOnInput)
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.