8 #include <daq/hbasf2/apps/ZMQDistributor.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
17 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
18 "where to read the events from")
19 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
20 "where to send the events to")
22 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
25 "size of the input buffer")
28 "how long to wait after no events come anymore");
40 if (type == EMessageTypes::c_newRunMessage) {
43 }
else if (type == EMessageTypes::c_lastEventMessage) {
46 }
else if (type == EMessageTypes::c_terminateMessage) {
64 auto messages =
m_input->handleIncomingData();
66 for (
auto&& message : messages) {
72 messageType = EMessageTypes::c_eventMessage;
85 m_output->handleEvent(std::move(outputMessage));
93 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
94 "where to read the events from")
95 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
96 "where to send the events to")
98 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101 "size of the input buffer")
104 "how long to wait after no events come anymore");
118 if (type == EMessageTypes::c_newRunMessage) {
120 }
else if (type == EMessageTypes::c_lastEventMessage) {
123 }
else if (type == EMessageTypes::c_terminateMessage) {
141 auto messages =
m_input->handleIncomingData();
143 for (
auto&& message : messages) {
149 messageType = EMessageTypes::c_eventMessage;
162 m_output->handleEvent(std::move(outputMessage));
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.