 |
Belle II Software
release-05-02-19
|
10 #include <daq/hbasf2/apps/ZMQCollector.h>
11 #include <framework/logging/Logger.h>
19 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
20 "where to read the events from")
21 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
22 "where to send the events to")
23 (
"lax", boost::program_options::bool_switch(&
m_lax)->default_value(
m_lax),
24 "dismiss events if no worker is ready (lax) or not")
27 "how long to wait after no events come anymore");
39 if (type == EMessageTypes::c_newRunMessage) {
42 }
else if (type == EMessageTypes::c_lastEventMessage) {
50 auto message =
m_input->handleIncomingData();
56 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
66 auto message =
m_input->overwriteStopMessage();
80 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
81 "where to read the events from")
82 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
83 "where to send the events to");
99 auto message =
m_input->handleIncomingData();
104 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
108 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
109 B2INFO(message->getDataMessage().size());
110 m_output->handleEvent(std::move(message->getDataMessage()));
118 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
119 "where to read the events from")
120 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
121 "where to send the events to")
124 "how long to wait after no events come anymore");
136 if (type == EMessageTypes::c_newRunMessage) {
138 }
else if (type == EMessageTypes::c_lastEventMessage) {
146 auto message =
m_input->handleIncomingData();
152 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
162 auto message =
m_input->overwriteStopMessage();
176 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
177 "where to read the events from")
178 (
"output", boost::program_options::value<std::string>(&
m_outputAddress)->required(),
179 "where to send the events to")
180 (
"addEventSize", boost::program_options::bool_switch(&
m_addEventSize)->default_value(
false),
181 "add the hlon of the event size at the beginning")
184 "how long to wait after no events come anymore");
198 if (type == EMessageTypes::c_newRunMessage) {
201 }
else if (type == EMessageTypes::c_lastEventMessage) {
209 auto message =
m_input->handleIncomingData();
218 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
222 if (message->isMessage(EMessageTypes::c_rawDataMessage) or message->isMessage(EMessageTypes::c_eventMessage)) {
223 m_output->handleEvent(std::move(message->getDataMessage()));
229 m_input->overwriteStopMessage();
240 (
"input", boost::program_options::value<std::string>(&
m_inputAddress)->required(),
241 "where to read the events from")
243 "where to send the events to")
245 "where to send the rois to")
246 (
"addEventSize", boost::program_options::bool_switch(&
m_addEventSize)->default_value(
false),
247 "add the hlon of the event size at the beginning")
250 "how long to wait after no events come anymore");
264 auto message =
m_input->handleIncomingData();
273 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
282 if (type == EMessageTypes::c_newRunMessage) {
285 }
else if (type == EMessageTypes::c_lastEventMessage) {
293 buffer <<
"{" << std::endl;
294 buffer <<
"\"monitor\": " <<
m_monitor->getMonitoringJSON() <<
"," << std::endl;
295 buffer <<
"\"input\": " <<
m_input->getMonitoringJSON() <<
"," << std::endl;
296 buffer <<
"\"output\": " <<
m_output->getMonitoringJSON() <<
"," << std::endl;
297 buffer <<
"\"roi\": " <<
m_output->getROIMonitoringJSON() << std::endl;
298 buffer <<
"}" << std::endl;
303 m_input->overwriteStopMessage();
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
std::string m_inputAddress
Parameter: input address.
std::unique_ptr< ZMQLoadBalancedOutput > m_output
Pointer to the output connection. Should be set in initialize.
std::string m_inputAddress
Parameter: input address.
bool m_lax
Parameter: Do not wait for a ready worker if set to true, but dismiss the incoming event.
Output part of a confirmed connection.
EMessageTypes
Type the messages can have.
std::string m_roiOutputAddress
Parameter: output address for ROIs (second part of message)
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
bool m_monitorHasPriority
Flag to break out of the polling loop to check for monitoring messages. Except for the finalcollector...
void handleExternalSignal(EMessageTypes type) final
Send a stop message on stop or clear the counters on start from the monitoring connection.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
void initialize() final
Initialize the two connections using the command line arguments.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
void initialize() final
Initialize the two connections using the command line arguments.
static auto stripIdentity(std::unique_ptr< ZMQIdMessage > message)
Create a No-ID Message out of an ID message.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
bool m_terminate
Can be set by functions to terminate the main loop at the next possibility.
std::string m_outputAddress
Parameter: output address.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)
void initialize() final
Initialize the two connections using the command line arguments.
std::shared_ptr< ZMQParent > m_parent
Pointer to the ZMQParent to be used as base for all connections.
void resetTimer()
Helper function to reset the start time and the remaining time.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
std::string m_outputAddress
Parameter: output address.
virtual void addOptions(po::options_description &desc)
Override in a derived class to add the command line arguments. Do not forget to call this base functi...
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
void handleInput() final
Pass the message from the input connection to the output connection (only data messages)
std::string m_inputAddress
Parameter: input address.
std::unique_ptr< ZMQSimpleConnection > m_monitor
Pointer to the monitoring connection. Should be set in initialize.
Abstract base class for different kinds of events.
std::string m_outputAddress
Parameter: output address.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
void fillMonitoringJSON(std::stringstream &buffer) const final
Special handling of the JSON function with additonal ROI.
void handleTimeout() final
When a timeout is set (= we are waiting for all messages after a stop), send a stop message once we h...
std::string m_inputAddress
Parameter: input address.
void initialize() final
Initialize the two connections using the command line arguments.
unsigned int m_stopWaitingTime
Parameter: how long to wait after no events come anymore.
void initialize() final
Initialize the two connections using the command line arguments.
std::string m_dataOutputAddress
Parameter: output address for data (first part of message)
void handleExternalSignal(EMessageTypes type) final
Set the stop message counter on stop or clear the counters on start from the monitoring connection.
std::string m_inputAddress
Parameter: input address.
std::string m_outputAddress
Parameter: output address.
void handleInput() final
Pass the message from the input connection to the output connection (data message to first,...
Output part of a load-balanced connection.
Helper connection hosting both a normal raw and a ROI output and sending to both at the same time.
Output connection to speak to non-zmq sockets via a ZMQ_STREAM socket.
void addOptions(po::options_description &desc) final
Add the parameters to the cmd line arguments.
bool m_addEventSize
Parameter: add the event size at the beginning of the message.
virtual void initialize()
Override in a derived class to initialize the connections from the given command line arguments....
unsigned int m_timeout
If set to a value != 0, will call handleTimeout with this frequency (in seconds).
std::unique_ptr< ZMQConfirmedInput > m_input
Pointer to the input connection. Should be set in initialize.
void handleInput() final
Pass the message from the input connection to the output connection (if there is a message)