8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
24 ZMQTxInputModule::ZMQTxInputModule() :
Module()
35 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
77 const auto multicastAnswer = [
this, &
terminate](
const auto & socket) {
78 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
79 const std::string& data = multicastMessage->getData();
81 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
83 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying");
93 const int workerID = std::atoi(data.c_str());
94 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
99 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
100 B2DEBUG(10,
"Having received a stop message. I can not do much here, but just hope for the best.");
108 const auto socketAnswer = [
this](
const auto & socket) {
109 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
110 if (message->isMessage(EMessageTypes::c_readyMessage)) {
111 B2DEBUG(10,
"got worker ready message");
112 m_nextWorker.push_back(std::stoi(message->getIdentity()));
116 B2ERROR(
"Invalid message from worker");
127 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not
m_nextWorker.empty());
131 B2DEBUG(10,
"Next worker is " << nextWorker);
135 if (eventMessage->size() > 0) {
138 B2DEBUG(10,
"Having send message to worker " << nextWorker);
145 B2DEBUG(10,
"finished event");
147 }
catch (zmq::error_t& ex) {
148 if (ex.num() != EINTR) {
149 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
151 }
catch (exception& ex) {
167 B2WARNING(
"Worker process timeout, workerID: " << workerID);
183 for (
unsigned int workerID :
m_workers) {
184 std::string workerIDString = std::to_string(workerID);
189 const auto multicastAnswer = [
this](
const auto & socket) {
190 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
191 const std::string& data = multicastMessage->getData();
198 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and
m_param_useEventBackup) {
199 const int workerID = std::atoi(data.c_str());
201 B2DEBUG(10,
"received worker delete message, workerID: " << workerID);
204 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
206 B2DEBUG(10,
"received c_helloMessage from " << data <<
"... replying with end message");
217 std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
bool isOnline() const
Check if the client was initialized and not terminated.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.