Belle II Software development
ZMQDistributor Class Reference

Standard distributor app: receive data via a raw connection (e.g. More...

#include <ZMQDistributor.h>

Inheritance diagram for ZMQDistributor:
ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >

Public Member Functions

void initFromConsole (const std::string &description, int argc, char *argv[])
 Should be called before the main() function to initialize the connections using the paremeters given on command line.
 
void main ()
 Start the main loop polling on the output and monitoring connections and eventually also on the input if the output is ready.
 

Protected Member Functions

void initialize () final
 Initialize the two connections using the command line arguments.
 
void addOptions (po::options_description &desc) final
 Add the parameters to the cmd line arguments.
 
void handleExternalSignal (EMessageTypes type) final
 Handle stop, start and terminate messages as described above.
 
void handleInput () final
 Pass the message from the input connection to the output connection (only data messages)
 
void handleTimeout () final
 When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we have not seen any more events.
 
virtual void handleOutput ()
 Will get called for every message on the output connection. Can be overridden in a derived class. Calls handleIncomingData by default.
 
virtual void fillMonitoringJSON (std::stringstream &buffer) const
 Using the connections, fill up a buffer with the content to be monitored.
 
bool terminated () const
 Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
 
void resetTimer ()
 Helper function to reset the start time and the remaining time.
 
void pollEvent (bool pollOnInput)
 Poll until a single event is retreived.
 

Protected Attributes

std::shared_ptr< ZMQParentm_parent
 Pointer to the ZMQParent to be used as base for all connections.
 
std::unique_ptr< ZMQRawInputm_input
 Pointer to the input connection. Should be set in initialize.
 
std::unique_ptr< ZMQLoadBalancedOutputm_output
 Pointer to the output connection. Should be set in initialize.
 
std::unique_ptr< ZMQSimpleConnectionm_monitor
 Pointer to the monitoring connection. Should be set in initialize.
 
bool m_terminate
 Can be set by functions to terminate the main loop at the next possibility.
 
unsigned int m_timeout
 If set to a value != 0, will call handleTimeout with this frequency (in seconds).
 
bool m_monitorHasPriority
 Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector you probably do not want this.
 

Private Member Functions

bool checkTimer ()
 Helper function to check, if the timeout should happen.
 
void updateTimer ()
 Helper function to update the remaining time.
 
void handleMonitoring ()
 Handle an incoming message on the monitoring socket by either calling handleExternalSignal() or by passing on the monitor JSONs of the connections.
 

Private Attributes

std::string m_inputAddress
 Parameter: input address.
 
std::string m_outputAddress
 Parameter: output address.
 
unsigned int m_maximalBufferSize = 80'000'000
 Parameter: buffer size for storing input messages.
 
bool m_expressRecoMode = false
 Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
 
unsigned int m_stopWaitingTime = 2
 Parameter: how long to wait after no events come anymore.
 
HLTMainLoop m_mainLoop
 Internal signal handler.
 
std::string m_monitoringAddress
 Storage for the monitoring address for the cmd arguments.
 
int m_remainingTime
 Counter for the remaining time until a timeout happens.
 
std::chrono::system_clock::time_point m_start
 Start time for the timeout.
 

Detailed Description

Standard distributor app: receive data via a raw connection (e.g.

from event builder) and send them out to any ready worker (via a load-balanced connection). If expressRecoMode is not set, wait until a worker is ready (otherwise dismiss events). As there are not signal messages on input, does not react on anything from input. When receiving a stop on monitoring, it starts a counter. Once there has not been any event for N second, it sends a stop to all clients. When receiving a start on monitoring, it clears the output. When receiving a terminate on monitoring, it sends out a terminate message to all clients and terminated itself.

Definition at line 33 of file ZMQDistributor.h.

Member Function Documentation

◆ addOptions()

void addOptions ( po::options_description &  desc)
finalprotectedvirtual

Add the parameters to the cmd line arguments.

Reimplemented from ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >.

Definition at line 13 of file ZMQDistributor.cc.

14{
16 desc.add_options()
17 ("input", boost::program_options::value<std::string>(&m_inputAddress)->required(),
18 "where to read the events from")
19 ("output", boost::program_options::value<std::string>(&m_outputAddress)->required(),
20 "where to send the events to")
21 ("expressRecoMode", boost::program_options::bool_switch(&m_expressRecoMode)->default_value(m_expressRecoMode),
22 "express reco mode: dismiss events if no worker is ready and send out event messages (instead of raw messages)")
23 ("maximalBufferSize",
24 boost::program_options::value<unsigned int>(&m_maximalBufferSize)->default_value(m_maximalBufferSize),
25 "size of the input buffer")
26 ("stopWaitingTime",
27 boost::program_options::value<unsigned int>(&m_stopWaitingTime)->default_value(m_stopWaitingTime),
28 "how long to wait after no events come anymore");
29}
unsigned int m_maximalBufferSize
Parameter: buffer size for storing input messages.
std::string m_inputAddress
Parameter: input address.
bool m_expressRecoMode
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
std::string m_outputAddress
Parameter: output address.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...

◆ checkTimer()

bool checkTimer
privateinherited

Helper function to check, if the timeout should happen.

Definition at line 111 of file ZMQApp.details.h.

196 {
197 updateTimer();
198 return m_remainingTime == 0;
199 }
int m_remainingTime
Counter for the remaining time until a timeout happens.
Definition: ZMQApp.h:106
void updateTimer()
Helper function to update the remaining time.

◆ fillMonitoringJSON()

void fillMonitoringJSON ( std::stringstream &  buffer) const
protectedvirtualinherited

Using the connections, fill up a buffer with the content to be monitored.

Definition at line 92 of file ZMQApp.details.h.

257 {
258 buffer << "{" << std::endl;
259 buffer << "\"monitor\": " << m_monitor->getMonitoringJSON() << "," << std::endl;
260 buffer << "\"input\": " << m_input->getMonitoringJSON() << "," << std::endl;
261 buffer << "\"output\": " << m_output->getMonitoringJSON() << std::endl;
262 buffer << "}" << std::endl;
263 }
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
Definition: ZMQApp.h:69
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
Definition: ZMQApp.h:71
std::unique_ptr< ZMQRawInput > m_input
Pointer to the input connection. Should be set in initialize.
Definition: ZMQApp.h:67

◆ handleExternalSignal()

void handleExternalSignal ( EMessageTypes  type)
finalprotectedvirtual

Handle stop, start and terminate messages as described above.

Reimplemented from ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >.

Definition at line 38 of file ZMQDistributor.cc.

39{
40 if (type == EMessageTypes::c_newRunMessage) {
41 m_input->clear();
42 m_output->clear();
43 } else if (type == EMessageTypes::c_lastEventMessage) {
45 resetTimer();
46 } else if (type == EMessageTypes::c_terminateMessage) {
47 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_terminateMessage));
48 m_input->clear();
49 m_terminate = true;
50 }
51}
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
Definition: ZMQApp.h:73
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
Definition: ZMQApp.h:75
void resetTimer()
Helper function to reset the start time and the remaining time.

◆ handleInput()

void handleInput ( )
finalprotectedvirtual

Pass the message from the input connection to the output connection (only data messages)

Reimplemented from ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >.

Definition at line 62 of file ZMQDistributor.cc.

63{
64 auto messages = m_input->handleIncomingData();
65
66 for (auto&& message : messages) {
67 // So there has been a message, make sure to reset the timer for waiting (if no timer is set this will just return)
68 resetTimer();
69
70 EMessageTypes messageType = EMessageTypes::c_rawDataMessage;
72 messageType = EMessageTypes::c_eventMessage;
73 }
74
75 auto outputMessage = ZMQMessageFactory::createMessage(messageType, std::move(message));
76
77 // We know that the output is ready for the first message, but we do not know anything about any other messages, so lets
78 // be safe and poll the output if it is not ready so far
79 while (not m_output->isReady() and not terminated()) {
80 pollEvent(false);
81 }
82 if (terminated()) {
83 return;
84 }
85 m_output->handleEvent(std::move(outputMessage));
86 }
87}
void pollEvent(bool pollOnInput)
Poll until a single event is retreived.
bool terminated() const
Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.
EMessageTypes
Type the messages can have.

◆ handleMonitoring()

void handleMonitoring
privateinherited

Handle an incoming message on the monitoring socket by either calling handleExternalSignal() or by passing on the monitor JSONs of the connections.

Definition at line 115 of file ZMQApp.details.h.

233 {
234 auto monitoringMessage = m_monitor->handleIncomingData();
235
236 if (monitoringMessage->isMessage(EMessageTypes::c_newRunMessage)) {
237 handleExternalSignal(EMessageTypes::c_newRunMessage);
238 return;
239 } else if (monitoringMessage->isMessage(EMessageTypes::c_lastEventMessage)) {
240 handleExternalSignal(EMessageTypes::c_lastEventMessage);
241 return;
242 } else if (monitoringMessage->isMessage(EMessageTypes::c_terminateMessage)) {
243 handleExternalSignal(EMessageTypes::c_terminateMessage);
244 return;
245 }
246
247 std::stringstream buffer;
248 fillMonitoringJSON(buffer);
249
250 auto message = ZMQMessageFactory::createMessage(monitoringMessage->getIdentity(),
251 EMessageTypes::c_confirmMessage, buffer.str());
252 m_monitor->handleEvent(std::move(message));
253 }
virtual void fillMonitoringJSON(std::stringstream &buffer) const
Using the connections, fill up a buffer with the content to be monitored.
virtual void handleExternalSignal(EMessageTypes)
Will get called for every signal message on the monitoring connection. Can be overridden in a derived...

◆ handleOutput()

void handleOutput
protectedvirtualinherited

Will get called for every message on the output connection. Can be overridden in a derived class. Calls handleIncomingData by default.

Definition at line 88 of file ZMQApp.details.h.

179 {
180 m_output->handleIncomingData();
181 }

◆ handleTimeout()

void handleTimeout ( )
finalprotectedvirtual

When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we have not seen any more events.

Reimplemented from ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >.

Definition at line 53 of file ZMQDistributor.cc.

54{
55 m_output->handleEvent(ZMQMessageFactory::createMessage(EMessageTypes::c_lastEventMessage));
56
57 // We do not want to send out another stop message, so reset the counter
58 m_timeout = 0;
59 resetTimer();
60}

◆ initFromConsole()

void initFromConsole ( const std::string &  description,
int  argc,
char *  argv[] 
)
inherited

Should be called before the main() function to initialize the connections using the paremeters given on command line.

Custom implementations should implement the addOptions function to pass the command line arguments correctly. Calls the initialize function (which should also be overridden).

Definition at line 54 of file ZMQApp.details.h.

22 {
23
24 po::options_description desc(description);
25 std::string connection_file;
26 int debugLevel(0);
27 desc.add_options()
28 ("connection-file", boost::program_options::value<std::string>(&connection_file),
29 "if given print the connection information for input/output and monitoring socket to the given filename "
30 "in json format")
31 ("debug", boost::program_options::value<int>(&debugLevel), "Enable debug logging");
32 addOptions(desc);
33
34 po::positional_options_description p;
35
36 po::variables_map vm;
37 try {
38 po::store(po::command_line_parser(argc, argv).options(desc).positional(p).run(), vm);
39 } catch (std::exception& e) {
40 B2FATAL(e.what());
41 }
42
43 if (vm.count("help")) {
44 std::cout << desc << std::endl;
45 exit(1);
46 }
47
48 try {
49 po::notify(vm);
50 } catch (std::exception& e) {
51 B2FATAL(e.what());
52 }
53
54 if (debugLevel > 0) {
55 auto& logging = LogSystem::Instance();
56 logging.getLogConfig()->setLogLevel(LogConfig::c_Debug);
57 logging.getLogConfig()->setDebugLevel(debugLevel);
58 B2DEBUG(1, "Enabled debug logging");
59 }
60
61 initialize();
62
63 if (not connection_file.empty()) {
64 B2DEBUG(1, "Write connection file" << LogVar("connection_file", connection_file));
65 nlohmann::json json;
66 try {
67 json["input"] = m_input->getEndPoint();
68 } catch (zmq::error_t& e) {
69 B2WARNING(e.what());
70 }
71 try {
72 json["output"] = m_output->getEndPoint();
73 } catch (zmq::error_t& e) {
74 B2WARNING(e.what());
75 }
76 std::ofstream connections(connection_file, std::ofstream::trunc);
77 if (!connections) {
78 B2FATAL("Cannot write connection file" << LogVar("connection_file", connection_file));
79 }
80 connections << std::setw(4) << json << std::endl;
81 }
82 }
Class to store variables with their name which were sent to the logging service.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....

◆ initialize()

void initialize ( )
finalprotectedvirtual

Initialize the two connections using the command line arguments.

Reimplemented from ZMQStandardApp< ZMQRawInput, ZMQLoadBalancedOutput >.

Definition at line 31 of file ZMQDistributor.cc.

32{
36}
Output part of a load-balanced connection.
Input connection allowing to speak with non-zmq sockets via a ZMQ_STREAM socket.
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
Definition: ZMQApp.h:65

◆ main()

void main
inherited

Start the main loop polling on the output and monitoring connections and eventually also on the input if the output is ready.

Calls the functions handleExternalSignal, handleInput, handleOutput and handleTimeout as described in the documentation of this class.

Definition at line 61 of file ZMQApp.details.h.

86 {
87 resetTimer();
88
89 while (not terminated()) {
90 if (not m_output->isReady()) {
91 // if the output is not ready, we can not sent anything. So lets just poll on output
92 // and monitoring until it becomes ready
93 m_monitor->log("output_state", "not_ready");
94 pollEvent(false);
95 } else {
96 // if it is ready, we can also include the input socket as long as output stays ready
97 m_monitor->log("output_state", "ready");
98 pollEvent(true);
99 }
100 }
101 }

◆ pollEvent()

void pollEvent ( bool  pollOnInput)
protectedinherited

Poll until a single event is retreived.

Definition at line 98 of file ZMQApp.details.h.

105 {
106 auto reactToOutput = [this]() {
107 // Get all messages from output socket
108 while (ZMQConnection::hasMessage(m_output.get()) and not terminated()) {
109 handleOutput();
110 if (m_monitorHasPriority) {
111 break;
112 }
113 }
114 };
115
116 auto reactToMonitor = [this]() {
117 // Get all messages from monitoring socket
118 while (ZMQConnection::hasMessage(m_monitor.get()) and not terminated()) {
120 }
121 };
122
123 auto reactToInput = [this]() {
124 // Get all messages from input as long output is ready
125 while (ZMQConnection::hasMessage(m_input.get()) and m_output->isReady() and not terminated()) {
126 handleInput();
127 if (m_monitorHasPriority) {
128 break;
129 }
130 }
131 };
132
133 m_monitor->logTime("waiting_since");
134
135 if (pollOnInput) {
136 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}, {m_input.get(), reactToInput}},
138 } else {
139 ZMQConnection::poll({{m_output.get(), reactToOutput}, {m_monitor.get(), reactToMonitor}}, m_remainingTime);
140 }
141
142 if (checkTimer() and not terminated()) {
143 B2ASSERT("There is no timeout set, but we still call the timeout() function? A bug!", m_timeout != 0);
144 m_monitor->increment("timeouts");
146 resetTimer();
147 } else {
148 updateTimer();
149 }
150 }
void handleMonitoring()
Handle an incoming message on the monitoring socket by either calling handleExternalSignal() or by pa...
virtual void handleInput()
Will get called for every message on the input connection. Can be overridden in a derived class....
virtual void handleOutput()
Will get called for every message on the output connection. Can be overridden in a derived class....
bool checkTimer()
Helper function to check, if the timeout should happen.
virtual void handleTimeout()
Will get called on a timeout. Can be overridden in a derived class. Empty by default.

◆ resetTimer()

void resetTimer
protectedinherited

Helper function to reset the start time and the remaining time.

Definition at line 96 of file ZMQApp.details.h.

220 {
221 // if there is no timeout, we should never set the remaining time
222 if (m_timeout == 0) {
223 m_remainingTime = -1;
224 return;
225 }
226
227 m_start = std::chrono::system_clock::now();
228 m_remainingTime = m_timeout * 1000;
229 }
std::chrono::system_clock::time_point m_start
Start time for the timeout.
Definition: ZMQApp.h:108

◆ terminated()

bool terminated
protectedinherited

Check if the main loop will be exited on next occasion. Can be set via the "m_terminate" flag.

Definition at line 94 of file ZMQApp.details.h.

190 {
191 return not m_mainLoop.isRunning() or m_terminate;
192 }
HLTMainLoop m_mainLoop
Internal signal handler.
Definition: ZMQApp.h:102

◆ updateTimer()

void updateTimer
privateinherited

Helper function to update the remaining time.

Definition at line 113 of file ZMQApp.details.h.

203 {
204 // if there is no timeout, we should never update the remaining time
205 if (m_timeout == 0) {
206 m_remainingTime = -1;
207 return;
208 }
209
210 auto currentTime = std::chrono::system_clock::now();
211 auto timeDifference = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - m_start);
212 m_remainingTime = m_timeout * 1000 - timeDifference.count();
213 if (m_remainingTime < 0) {
214 m_remainingTime = 0;
215 }
216 }

Member Data Documentation

◆ m_expressRecoMode

bool m_expressRecoMode = false
private

Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.

Definition at line 54 of file ZMQDistributor.h.

◆ m_input

std::unique_ptr<ZMQRawInput > m_input
protectedinherited

Pointer to the input connection. Should be set in initialize.

Definition at line 67 of file ZMQApp.h.

◆ m_inputAddress

std::string m_inputAddress
private

Parameter: input address.

Definition at line 48 of file ZMQDistributor.h.

◆ m_mainLoop

HLTMainLoop m_mainLoop
privateinherited

Internal signal handler.

Definition at line 102 of file ZMQApp.h.

◆ m_maximalBufferSize

unsigned int m_maximalBufferSize = 80'000'000
private

Parameter: buffer size for storing input messages.

Definition at line 52 of file ZMQDistributor.h.

◆ m_monitor

std::unique_ptr<ZMQSimpleConnection> m_monitor
protectedinherited

Pointer to the monitoring connection. Should be set in initialize.

Definition at line 71 of file ZMQApp.h.

◆ m_monitorHasPriority

bool m_monitorHasPriority
protectedinherited

Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector you probably do not want this.

Definition at line 77 of file ZMQApp.h.

◆ m_monitoringAddress

std::string m_monitoringAddress
privateinherited

Storage for the monitoring address for the cmd arguments.

Definition at line 104 of file ZMQApp.h.

◆ m_output

std::unique_ptr<ZMQLoadBalancedOutput > m_output
protectedinherited

Pointer to the output connection. Should be set in initialize.

Definition at line 69 of file ZMQApp.h.

◆ m_outputAddress

std::string m_outputAddress
private

Parameter: output address.

Definition at line 50 of file ZMQDistributor.h.

◆ m_parent

std::shared_ptr<ZMQParent> m_parent
protectedinherited

Pointer to the ZMQParent to be used as base for all connections.

Definition at line 65 of file ZMQApp.h.

◆ m_remainingTime

int m_remainingTime
privateinherited

Counter for the remaining time until a timeout happens.

Definition at line 106 of file ZMQApp.h.

◆ m_start

std::chrono::system_clock::time_point m_start
privateinherited

Start time for the timeout.

Definition at line 108 of file ZMQApp.h.

◆ m_stopWaitingTime

unsigned int m_stopWaitingTime = 2
private

Parameter: how long to wait after no events come anymore.

Definition at line 56 of file ZMQDistributor.h.

◆ m_terminate

bool m_terminate
protectedinherited

Can be set by functions to terminate the main loop at the next possibility.

Definition at line 73 of file ZMQApp.h.

◆ m_timeout

unsigned int m_timeout
protectedinherited

If set to a value != 0, will call handleTimeout with this frequency (in seconds).

Definition at line 75 of file ZMQApp.h.


The documentation for this class was generated from the following files: