8#include <framework/pcore/zmq/connections/ZMQConfirmedConnection.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
21 log(
"last_received_message",
"");
22 log(
"total_number_messages", 0l);
23 log(
"registered_workers", 0l);
24 log(
"hello_messages", 0l);
25 log(
"dead_workers", 0l);
26 log(
"all_stop_messages", 0l);
27 log(
"sent_stop_messages", 0l);
28 log(
"last_stop_sent",
"");
29 log(
"received_stop_messages", 0l);
30 log(
"all_terminate_messages", 0l);
31 log(
"sent_terminate_messages", 0l);
32 log(
"last_terminate_sent",
"");
33 log(
"received_terminate_messages", 0l);
34 log(
"received_messages_after_stop", 0l);
35 log(
"last_received_event_message",
"");
36 log(
"last_clear",
"");
37 log(
"stop_overwrites", 0l);
38 log(
"last_stop_overwrite",
"");
40 log(
"data_size", 0.0);
41 log(
"received_events", 0l);
42 log(
"event_rate", 0.0);
50 auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(
m_socket);
51 const auto fromIdentity = message->getIdentity();
53 logTime(
"last_received_message");
55 increment(
"total_number_messages_from[" + fromIdentity +
"]");
60 if (message->isMessage(EMessageTypes::c_helloMessage)) {
65 increment(
"hello_messages_from[" + fromIdentity +
"]");
67 }
else if (message->isMessage(EMessageTypes::c_deleteWorkerMessage)) {
70 B2DEBUG(30,
"Got message from " << message->getIdentity() <<
" to kill " << message->getMessagePartAsString<2>());
71 const std::string& killedIdentity = message->getMessagePartAsString<2>();
76 increment(
"dead_worker_messaged_from[" + fromIdentity +
"]");
79 B2ERROR(
"There is not a single worker registered anymore!");
105 B2ASSERT(
"Worker without proper registration!",
108 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
112 increment(
"total_received_stop_messages");
127 }
else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
131 increment(
"total_received_terminate_messages");
138 logTime(
"last_terminate_sent");
152 B2ERROR(
"Received an event after having received stop messages from every worker. This is not a good sign! I will dismiss this event and next events!");
154 increment(
"received_messages_after_stop");
155 auto t1 = std::chrono::system_clock::now();
157 if (intervalAfterAllStopMessages > std::chrono::seconds{150}) {
158 B2FATAL(
"Too many events after having received stop messages! This is abnormal. I will kill the process!");
164 const auto dataSize = message->getDataMessage().size();
166 average(
"data_size", dataSize);
167 average(
"data_size_from[" + fromIdentity +
"]", dataSize);
170 increment(
"received_events[" + fromIdentity +
"]");
173 timeit<200>(
"event_rate_from[" + fromIdentity +
"]");
175 logTime(
"last_received_event_message");
201 B2ERROR(
"Sending out a stop message although not all of the workers are finished already!");
203 logTime(
"last_stop_overwrite");
221 log(
"no_confirmation_message", 0l);
222 log(
"last_sent_event_message", 0l);
223 log(
"data_size", 0.0);
224 log(
"sent_events", 0l);
225 log(
"event_rate", 0.0);
226 log(
"timespan_waiting_for_confirmation", 0l);
240 auto current = std::chrono::system_clock::now();
245 }
else if (requireConfirmation) {
246 B2FATAL(
"Did not receive a confirmation message in time!");
248 B2WARNING(
"Did not receive a confirmation message in time!");
254 auto afterWaiting = std::chrono::system_clock::now();
261 const auto dataSize = message->getDataMessage().size();
263 average(
"data_size", dataSize);
267 logTime(
"last_sent_event_message");
277 auto acceptMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(
m_socket);
278 B2ASSERT(
"Should be an accept message", acceptMessage->isMessage(EMessageTypes::c_confirmMessage));
void handleEvent(std::unique_ptr< ZMQNoIdMessage > message, bool requireConfirmation=true, int maximalWaitTime=10000)
Send the message to the output (a message without a ID as there is only a single output).
long m_timespanWaitingForConfirmation
Internal monitoring how long we were waiting for confirmation messages.
void handleIncomingData()
Blocks until it can receive the (hopefully confirmation) message from the output.
unsigned int m_waitingForConfirmation
On how many confirmation messages are we still waiting?
ZMQConfirmedOutput(const std::string &outputAddress, const std::shared_ptr< ZMQParent > &parent)
Create a new confirmed output by connecting to the address.
Specialized connection over a ZMQ socket.
std::shared_ptr< ZMQParent > m_parent
The shared ZMQParent instance.
std::unique_ptr< zmq::socket_t > m_socket
The memory of the socket. Needs to be initialized in a derived class.
void logTime(const std::string &key)
Store the current time as a string under the given key.
void increment(const std::string &key)
Increment the value with the given key (only numerical values). If not present, set to 1.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
static unsigned int poll(const std::vector< zmq::socket_t * > &socketList, int timeout)
Poll function.
void timeit(const std::string &key)
Measure the rate of calls with the same key every AVERAGE_SIZE calls (and also display the last time ...
static void send(std::unique_ptr< zmq::socket_t > &socket, AZMQMessage message)
Send a given message over the given created socket. You need to move in the message for zero-copy.
void log(const std::string &key, const AClass &value)
Store a value under a certain key. Different types of values can be stored, namely long,...
void average(const std::string &key, double value)
Instead of storeing the double value directly under the given key, store the average of the last MAX_...
Abstract base class for different kinds of events.