8#include <framework/pcore/zmq/processModules/ZMQTxInputModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/pcore/zmq/utils/EventMetaDataSerialization.h>
11#include <framework/pcore/EvtMessage.h>
12#include <framework/core/Environment.h>
13#include <framework/core/RandomGenerator.h>
14#include <framework/datastore/StoreObjPtr.h>
35 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
36 "set the number of processes to at least 1.",
50 B2INFO(
"ZMQTxInputModule :: First Event here");
59 m_zmqClient.subscribe(EMessageTypes::c_confirmMessage);
60 m_zmqClient.subscribe(EMessageTypes::c_helloMessage);
61 m_zmqClient.subscribe(EMessageTypes::c_deleteWorkerMessage);
62 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
79 const auto multicastAnswer = [
this, &
terminate](
const auto & socket) {
81 const std::string& data = multicastMessage->getData();
83 if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
85 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying");
95 const int workerID = std::atoi(data.c_str());
96 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
100 }
else if (multicastMessage->isMessage(EMessageTypes::c_terminateMessage)) {
101 B2DEBUG(30,
"Having received a stop message. I can not do much here, but just hope for the best.");
109 const auto socketAnswer = [
this](
const auto & socket) {
111 if (message->isMessage(EMessageTypes::c_readyMessage)) {
112 B2DEBUG(30,
"got worker ready message");
113 m_nextWorker.push_back(std::stoi(message->getIdentity()));
117 B2ERROR(
"Invalid message from worker");
125 B2INFO(
"ZMQTxInput : special first event processing");
134 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
143 unsigned int workerID = *it;
145 std::string workerIDString = std::to_string(workerID);
146 B2INFO(
"ZMQTxInput :: sending first event to worker : " <<
LogVar(
"worker", workerIDString));
152 if (*(it + 1) == workerID) {
158 B2INFO(
"ZMQTxInput : Special first event sent to all workers");
163 m_zmqClient.poll(timeout, multicastAnswer, socketAnswer);
170 B2ASSERT(
"Did not receive any ready messaged for quite some time!", not
m_nextWorker.empty());
174 B2DEBUG(30,
"Next worker is " << nextWorker);
178 if (eventMessage->size() > 0) {
181 B2DEBUG(30,
"Having send message to worker " << nextWorker);
188 B2DEBUG(30,
"finished event");
190 }
catch (zmq::error_t& ex) {
191 if (ex.num() != EINTR) {
192 B2ERROR(
"There was an error during the Tx input event: " << ex.what());
194 }
catch (exception& ex) {
204 B2DEBUG(30,
"ZMQTxInput:: EndRun detected. isEndOfRun = " <<
m_eventMetaData->isEndOfRun() <<
" RunNo = " <<
214 for (
unsigned int workerID :
m_workers) {
215 if (workerID == (
unsigned int)getpid())
continue;
216 std::string workerIDString = std::to_string(workerID);
220 B2INFO(
"ZMQTxInput : End Run sent to all workers");
233 B2WARNING(
"Worker process timeout, workerID: " << workerID);
249 for (
unsigned int workerID :
m_workers) {
250 std::string workerIDString = std::to_string(workerID);
255 const auto multicastAnswer = [
this](
const auto & socket) {
257 const std::string& data = multicastMessage->getData();
264 }
else if (multicastMessage->isMessage(EMessageTypes::c_deleteWorkerMessage) and
m_param_useEventBackup) {
265 const int workerID = std::atoi(data.c_str());
267 B2DEBUG(30,
"received worker delete message, workerID: " << workerID);
270 }
else if (multicastMessage->isMessage(EMessageTypes::c_helloMessage)) {
272 B2DEBUG(30,
"received c_helloMessage from " << data <<
"... replying with end message");
283 std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
unsigned int getZMQEventBufferSize() const
Number of events to keep in flight for every worker.
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
static Environment & Instance()
Static method to get a reference to the Environment instance.
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
bool registerInDataStore(DataStore::EStoreFlags storeFlags=DataStore::c_WriteOut)
Register the object/array in the DataStore.
Type-safe access to single objects in the data store.
static std::unique_ptr< AMessage > fromSocket(const std::unique_ptr< zmq::socket_t > &socket)
Create a message of the given type by receiving a message from the socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Class to store variables with their name which were sent to the logging service.
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.