 |
Belle II Software
release-05-02-19
|
10 #include <daq/hbasf2/apps/ZMQDistributor.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
19 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
20 "where to read the events from")
21 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
22 "where to send the events to")
24 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
27 "size of the input buffer")
30 "how long to wait after no events come anymore");
42 if (type == EMessageTypes::c_newRunMessage) {
45 }
else if (type == EMessageTypes::c_lastEventMessage) {
48 }
else if (type == EMessageTypes::c_terminateMessage) {
66 auto messages =
m_input->handleIncomingData();
68 for (
auto && message : messages) {
74 messageType = EMessageTypes::c_eventMessage;
87 m_output->handleEvent(std::move(outputMessage));
95 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
96 "where to read the events from")
97 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
98 "where to send the events to")
100 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101 (
"maximalBufferSize",
103 "size of the input buffer")
106 "how long to wait after no events come anymore");
120 if (type == EMessageTypes::c_newRunMessage) {
122 }
else if (type == EMessageTypes::c_lastEventMessage) {
125 }
else if (type == EMessageTypes::c_terminateMessage) {
143 auto messages =
m_input->handleIncomingData();
145 for (
auto && message : messages) {
151 messageType = EMessageTypes::c_eventMessage;
164 m_output->handleEvent(std::move(outputMessage));
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
Output part of a confirmed connection.
EMessageTypes
Type the messages can have.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void initialize() final
Initialize the two connections using the command line arguments.
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
void resetTimer()
Helper function to reset the start time and the remaining time.
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
Abstract base class for different kinds of events.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
Output part of a load-balanced connection.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
std::string m_outputAddress
Parameter: output address.
std::string m_inputAddress
Parameter: input address.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.