8#include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11#include <framework/pcore/EvtMessage.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/datastore/StoreObjPtr.h>
35 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
50 B2INFO(
"ZMQTxInputModule :: First Event here");
79 const auto multicastAnswer = [
this, &
terminate](
const auto & socket) {
80 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
81 const std::string& data = multicastMessage->getData();
83 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
85 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying");
95 const int workerID = std::atoi(data.c_str());
96 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
100 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
101 B2DEBUG(30,
"Having received a stop message. I can not do much here, but just hope for the best.");
109 const auto socketAnswer = [
this](
const auto & socket) {
110 const auto message = ZMQMessageFactory::fromSocket<ZMQIdMessage>(socket);
111 if (message->isMessage(EMessageTypes::c_readyMessage)) {
112 B2DEBUG(30,
"got worker ready message");
113 m_nextWorker.push_back(std::stoi(message->getIdentity()));
117 B2ERROR(
"Invalid message from worker");
125 B2INFO(
"ZMQTxInput : special first event processing");
143 unsigned int workerID = *it;
145 std::string workerIDString = std::to_string(workerID);
146 B2INFO(
"ZMQTxInput :: sending first event to worker : " <<
LogVar(
"worker", workerIDString));
152 if (*(it + 1) == workerID) {
158 B2INFO(
"ZMQTxInput : Special first event sent to all workers");
170 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not
m_nextWorker.empty());
174 B2DEBUG(30,
"Next worker is " << nextWorker);
178 if (eventMessage->size() > 0) {
181 B2DEBUG(30,
"Having send message to worker " << nextWorker);
188 B2DEBUG(30,
"finished event");
190 }
catch (zmq::error_t& ex) {
191 if (ex.num() != EINTR) {
192 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
194 }
catch (exception& ex) {
204 B2DEBUG(30,
"ZMQTxInput:: EndRun detected. isEndOfRun = " <<
m_eventMetaData->isEndOfRun() <<
" RunNo = " <<
214 for (
unsigned int workerID :
m_workers) {
215 if (workerID == (
unsigned int)getpid())
continue;
216 std::string workerIDString = std::to_string(workerID);
220 B2INFO(
"ZMQTxInput : End Run sent to all workers");
233 B2WARNING(
"Worker process timeout, workerID: " << workerID);
249 for (
unsigned int workerID :
m_workers) {
250 std::string workerIDString = std::to_string(workerID);
255 const auto multicastAnswer = [
this](
const auto & socket) {
256 const auto multicastMessage = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
257 const std::string& data = multicastMessage->getData();
264 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and
m_param_useEventBackup) {
265 const int workerID = std::atoi(data.c_str());
267 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
270 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
272 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying with end message");
283 std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
unsigned int getZMQMaximalWaitingTime() const
Maximal waiting time of any ZMQ module for any communication in ms.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
unsigned int size() const
Check the size.
void storeEvent(std::unique_ptr< EvtMessage > evtMsg, const StoreObjPtr< EventMetaData > &evtMetaData, const unsigned int workerId)
Add a new event backup with the given information. Takes ownership of the evt message.
int checkForTimeout(const Duration &timeout) const
Check the items for timeout. Returns -1 if no timeout happened and the worker id, if it did.
void removeEvent(const EventMetaData &evtMetaData)
Remove all backups with the given event meta data (on confirmation)
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
bool isOnline() const
Check if the client was initialized and not terminated.
void publish(AZMQMessage message) const
Publish the message to the multicast.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void send(AZMQMessage message) const
Send a message over the data socket.
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Class to store variables with their name which were sent to the logging service.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
void sendWorkerBackupEvents(unsigned int worker, const AZMQClient &socket)
Send all backups of a given worker directly to the multicast and delete them.
int pollMulticast(unsigned int timeout, AMulticastAnswer multicastAnswer) const
Poll method to only the multicast socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.