Belle II Software  release-05-02-19
ZMQDistributor.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2019 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <daq/hbasf2/apps/ZMQDistributor.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 
13 using namespace Belle2;
14 
15 void ZMQDistributor::addOptions(po::options_description& desc)
16 {
18  desc.add_options()
19  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
20  "where to read the events from")
21  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
22  "where to send the events to")
23  ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
24  "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
25  ("maximalBufferSize",
26  boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
27  "size of the input buffer")
28  ("stopWaitingTime",
29  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
30  "how long to wait after no events come anymore");
31 }
32 
34 {
38 }
39 
41 {
42  if (type == EMessageTypes::c_newRunMessage) {
43  m_input->clear();
44  m_output->clear();
45  } else if (type == EMessageTypes::c_lastEventMessage) {
47  resetTimer();
48  } else if (type == EMessageTypes::c_terminateMessage) {
49  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
50  m_input->clear();
51  m_terminate = true;
52  }
53 }
54 
56 {
57  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
58 
59  // We do not want to send out another stop message, so reset the counter
60  m_timeout = 0;
61  resetTimer();
62 }
63 
65 {
66  auto messages = m_input->handleIncomingData();
67 
68  for (auto && message : messages) {
69  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
70  resetTimer();
71 
72  EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
73  if (m_expressRecoMode) {
74  messageType = EMessageTypes::c_eventMessage;
75  }
76 
77  auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
78 
79  // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
80  // be safe and poll the output if it is not ready so far
81  while (not m_output->isReady() and not terminated()) {
82  pollEvent(false);
83  }
84  if (terminated()) {
85  return;
86  }
87  m_output->handleEvent(std::move(outputMessage));
88  }
89 }
90 
91 void ZMQInputAdapter::addOptions(po::options_description& desc)
92 {
94  desc.add_options()
95  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
96  "where to read the events from")
97  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
98  "where to send the events to")
99  ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
100  "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
101  ("maximalBufferSize",
102  boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
103  "size of the input buffer")
104  ("stopWaitingTime",
105  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
106  "how long to wait after no events come anymore");
107 }
108 
110 {
114 
115  m_monitorHasPriority = true;
116 }
117 
119 {
120  if (type == EMessageTypes::c_newRunMessage) {
121  m_input->clear();
122  } else if (type == EMessageTypes::c_lastEventMessage) {
124  resetTimer();
125  } else if (type == EMessageTypes::c_terminateMessage) {
126  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
127  m_input->clear();
128  m_terminate = true;
129  }
130 }
131 
133 {
134  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
135 
136  // We do not want to send out another stop message, so reset the counter
137  m_timeout = 0;
138  resetTimer();
139 }
140 
142 {
143  auto messages = m_input->handleIncomingData();
144 
145  for (auto && message : messages) {
146  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
147  resetTimer();
148 
149  EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
150  if (m_expressRecoMode) {
151  messageType = EMessageTypes::c_eventMessage;
152  }
153 
154  auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
155 
156  // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
157  // be safe and poll the output if it is not ready so far
158  while (not m_output->isReady() and not terminated()) {
159  pollEvent(false);
160  }
161  if (terminated()) {
162  return;
163  }
164  m_output->handleEvent(std::move(outputMessage));
165  }
166 }
Belle2::ZMQInputAdapter::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQDistributor.cc:132
Belle2::ZMQDistributor::m_maximalBufferSize
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
Definition: ZMQDistributor.h:62
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::m_output
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:79
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::pollEvent
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
Definition: ZMQApp.details.h:78
Belle2::ZMQConfirmedOutput
Output part of a confirmed connection.
Definition: ZMQConfirmedConnection.h:116
Belle2::EMessageTypes
EMessageTypes
Type the messages can have.
Definition: ZMQDefinitions.h:26
Belle2::ZMQStandardApp< ZMQRawInput, ZMQConfirmedOutput >::m_monitorHasPriority
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:87
Belle2::ZMQInputAdapter::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQDistributor.h:84
Belle2::ZMQDistributor::handleTimeout
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
Definition: ZMQDistributor.cc:55
Belle2::ZMQInputAdapter::m_expressRecoMode
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
Definition: ZMQDistributor.h:90
Belle2::ZMQRawInput
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
Definition: ZMQRawConnection.h:51
Belle2::ZMQInputAdapter::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQDistributor.h:92
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::m_terminate
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:83
Belle2::ZMQDistributor::m_stopWaitingTime
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
Definition: ZMQDistributor.h:66
Belle2::ZMQDistributor::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQDistributor.cc:33
Belle2::ZMQInputAdapter::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQDistributor.h:86
Belle2::ZMQInputAdapter::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
Definition: ZMQDistributor.cc:118
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::m_parent
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:75
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::resetTimer
void resetTimer()
Helper function to reset the start time and the remaining time.
Definition: ZMQApp.details.h:193
Belle2::ZMQStandardApp::addOptions
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
Definition: ZMQApp.details.h:134
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::terminated
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
Definition: ZMQApp.details.h:163
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ZMQDistributor::handleExternalSignal
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
Definition: ZMQDistributor.cc:40
Belle2::ZMQInputAdapter::initialize
void initialize() final
Initialize the two connections using the command line arguments.
Definition: ZMQDistributor.cc:109
Belle2::ZMQLoadBalancedOutput
Output part of a load-balanced connection.
Definition: ZMQLoadBalancedConnection.h:81
Belle2::ZMQInputAdapter::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQDistributor.cc:91
Belle2::ZMQDistributor::m_expressRecoMode
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Definition: ZMQDistributor.h:64
Belle2::ZMQDistributor::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
Definition: ZMQDistributor.cc:64
Belle2::ZMQDistributor::m_outputAddress
std::string m_outputAddress
Parameter: output address.
Definition: ZMQDistributor.h:60
Belle2::ZMQInputAdapter::handleInput
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Definition: ZMQDistributor.cc:141
Belle2::ZMQDistributor::m_inputAddress
std::string m_inputAddress
Parameter: input address.
Definition: ZMQDistributor.h:58
Belle2::ZMQStandardApp::initialize
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
Definition: ZMQApp.details.h:127
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::m_timeout
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:85
Belle2::ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >::m_input
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:77
Belle2::ZMQInputAdapter::m_maximalBufferSize
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
Definition: ZMQDistributor.h:88
Belle2::ZMQDistributor::addOptions
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
Definition: ZMQDistributor.cc:15