8 #include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11 #include <framework/pcore/EvtMessage.h>
12 #include <framework/core/Environment.h>
13 #include <framework/core/RandomGenerator.h>
14 #include <framework/datastore/StoreObjPtr.h>
24 ZMQTxInputModule::ZMQTxInputModule() :
Module()
35 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
50 B2INFO(
"ZMQTxInputModule :: First Event here");
82 const auto multicastAnswer = [
this, &
terminate](
const auto & socket) {
83 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
84 const std::string& data = multicastMessage->getData();
86 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
88 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying");
99 const int workerID = std::atoi(data.c_str());
100 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
104 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
105 B2DEBUG(30,
"Having received a stop message. I can not do much here, but just hope for the best.");
113 const auto socketAnswer = [
this](
const auto & socket) {
114 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
115 if (message->isMessage(EMessageTypes::c_readyMessage)) {
116 B2DEBUG(30,
"got worker ready message");
117 m_nextWorker.push_back(std::stoi(message->getIdentity()));
121 B2ERROR(
"Invalid message from worker");
130 B2INFO(
"ZMQTxInput : special first event processing");
150 unsigned int workerID = *it;
152 std::string workerIDString = std::to_string(workerID);
153 B2INFO(
"ZMQTxInput :: sending first event to worker : " <<
LogVar(
"worker", workerIDString));
159 if (*(it + 1) == workerID) {
166 B2INFO(
"ZMQTxInput : Special first event sent to all workers");
179 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not
m_nextWorker.empty());
184 B2DEBUG(30,
"Next worker is " << nextWorker);
188 if (eventMessage->size() > 0) {
191 B2DEBUG(30,
"Having send message to worker " << nextWorker);
198 B2DEBUG(30,
"finished event");
200 }
catch (zmq::error_t& ex) {
201 if (ex.num() != EINTR) {
202 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
204 }
catch (exception& ex) {
214 B2DEBUG(30,
"ZMQTxInput:: EndRun detected. isEndOfRun = " <<
m_eventMetaData->isEndOfRun() <<
" RunNo = " <<
224 for (
unsigned int workerID :
m_workers) {
225 if (workerID == (
unsigned int)getpid())
continue;
226 std::string workerIDString = std::to_string(workerID);
231 B2INFO(
"ZMQTxInput : End Run sent to all workers");
244 B2WARNING(
"Worker process timeout, workerID: " << workerID);
260 for (
unsigned int workerID :
m_workers) {
261 std::string workerIDString = std::to_string(workerID);
266 const auto multicastAnswer = [
this](
const auto & socket) {
267 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
268 const std::string& data = multicastMessage->getData();
275 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and
m_param_useEventBackup) {
276 const int workerID = std::atoi(data.c_str());
278 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
281 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
283 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying with end message");
294 std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
unsigned int getZMQMaximalWaitingTime() const
Maximal waiting time of any ZMQ module for any communication in ms.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
bool isOnline() const
Check if the client was initialized and not terminated.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Class to store variables with their name which were sent to the logging service.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.