Belle II Software  release-08-01-10
ZMQDistributor.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <daq/hbasf2/apps/ZMQDistributor.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 
11 using namespace Belle2;
12 
13 void ZMQDistributor::addOptions(po::options_description& desc)
14 {
16  desc.add_options()
17  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18  "where to read the events from")
19  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20  "where to send the events to")
21  ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
22  "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
23  ("maximalBufferSize",
24  boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
25  "size of the input buffer")
26  ("stopWaitingTime",
27  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
28  "how long to wait after no events come anymore");
29 }
30 
32 {
36 }
37 
39 {
40  if (type == EMessageTypes::c_newRunMessage) {
41  m_input->clear();
42  m_output->clear();
43  } else if (type == EMessageTypes::c_lastEventMessage) {
45  resetTimer();
46  } else if (type == EMessageTypes::c_terminateMessage) {
47  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
48  m_input->clear();
49  m_terminate = true;
50  }
51 }
52 
54 {
55  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
56 
57  // We do not want to send out another stop message, so reset the counter
58  m_timeout = 0;
59  resetTimer();
60 }
61 
63 {
64  auto messages = m_input->handleIncomingData();
65 
66  for (auto&& message : messages) {
67  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
68  resetTimer();
69 
70  EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
71  if (m_expressRecoMode) {
72  messageType = EMessageTypes::c_eventMessage;
73  }
74 
75  auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
76 
77  // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
78  // be safe and poll the output if it is not ready so far
79  while (not m_output->isReady() and not terminated()) {
80  pollEvent(false);
81  }
82  if (terminated()) {
83  return;
84  }
85  m_output->handleEvent(std::move(outputMessage));
86  }
87 }
88 
89 void ZMQInputAdapter::addOptions(po::options_description& desc)
90 {
92  desc.add_options()
93  ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
94  "where to read the events from")
95  ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
96  "where to send the events to")
97  ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
98  "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
99  ("maximalBufferSize",
100  boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
101  "size of the input buffer")
102  ("stopWaitingTime",
103  boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
104  "how long to wait after no events come anymore");
105 }
106 
108 {
112 
113  m_monitorHasPriority = true;
114 }
115 
117 {
118  if (type == EMessageTypes::c_newRunMessage) {
119  m_input->clear();
120  } else if (type == EMessageTypes::c_lastEventMessage) {
122  resetTimer();
123  } else if (type == EMessageTypes::c_terminateMessage) {
124  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
125  m_input->clear();
126  m_terminate = true;
127  }
128 }
129 
131 {
132  m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
133 
134  // We do not want to send out another stop message, so reset the counter
135  m_timeout = 0;
136  resetTimer();
137 }
138 
140 {
141  auto messages = m_input->handleIncomingData();
142 
143  for (auto&& message : messages) {
144  // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
145  resetTimer();
146 
147  EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
148  if (m_expressRecoMode) {
149  messageType = EMessageTypes::c_eventMessage;
150  }
151 
152  auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
153 
154  // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
155  // be safe and poll the output if it is not ready so far
156  while (not m_output->isReady() and not terminated()) {
157  pollEvent(false);
158  }
159  if (terminated()) {
160  return;
161  }
162  m_output->handleEvent(std::move(outputMessage));
163  }
164 }
Output part of a confirmed connection.
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Handle stop, start and terminate messages as described above.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
bool m_expressRecoMode
Parameter: Choose how the input events are formatted.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
Output part of a load-balanced connection.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
Definition: ZMQApp.h:77
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
void resetTimer()
Helper function to reset the start time and the remaining time.
EMessageTypes
Type the messages can have.
Abstract base class for different kinds of events.