9 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
13 #include <framework/core/Environment.h>
20 static int s_event_number = 0;
22 ZMQRxWorkerModule::ZMQRxWorkerModule() :
Module()
32 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
33 "set the number of processes to at least 1.",
64 bool inputProcessIsGone =
false;
67 const auto socketHelloAnswer = [&inputProcessIsGone](
const auto & socket) {
68 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
70 inputProcessIsGone =
true;
73 B2ASSERT(
"Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
78 if (inputProcessIsGone or not pollResult) {
79 B2WARNING(
"It seems the input process is already gone.");
93 const auto multicastAnswer = [](
const auto & socket) {
94 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
95 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
96 B2DEBUG(30,
"Having received an graceful stop message. Will now go on.");
101 B2ERROR(
"Undefined message on multicast");
105 const auto socketAnswer = [
this](
const auto & socket) {
106 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
107 if (message->isMessage(EMessageTypes::c_eventMessage)) {
108 B2DEBUG(30,
"received event message... write it to data store");
113 B2INFO(
"ZMQRxWorker : special event generated by HLTZMQ2Ds received.");
117 }
else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
118 B2DEBUG(30,
"received end message from input");
122 B2DEBUG(30,
"received unexpected message from input");
130 B2ASSERT(
"The input process did not send any event in some time!", pollReply);
135 B2DEBUG(30,
"Finished with event");
137 }
catch (zmq::error_t& ex) {
138 if (ex.num() != EINTR) {
139 B2ERROR(
"There was an error during the Rx worker event: " << ex.what());
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
void initialize() override
Initialize the streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Receive an event and store it in the datastore. Tell the input process we are ready.
void terminate() override
Terminate the client and tell the monitor, we are done.
unsigned int m_param_maximalWaitingTime
Maximal time to wait in polling.
unsigned int m_param_bufferSize
How many events do we want to have in the buffer.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Abstract base class for different kinds of events.