Belle II Software  release-08-01-10
ZMQRxWorkerModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
10 
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
13 #include <framework/core/Environment.h>
14 
15 using namespace std;
16 using namespace Belle2;
17 
18 REG_MODULE(ZMQRxWorker);
19 
20 static int s_event_number = 0;
21 
22 ZMQRxWorkerModule::ZMQRxWorkerModule() : Module()
23 {
24  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
25  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
26  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
27  addParam("eventBufferSize", m_param_bufferSize, "Maximal number of events to store in the internal buffer");
28  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
29 
30  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
31 
32  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
33  "set the number of processes to at least 1.",
34  Environment::Instance().getNumberProcesses());
35 }
36 
38 {
39  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
40 }
41 
43 {
44  try {
45  // B2INFO ( "ZMQRxWorker :: event = " << s_event_number++ );
46 
47  if (m_firstEvent) {
50 
51  // Wait some time until input process comes up
52  sleep(1);
53 
54  // Listen to stop messages
55  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
56 
57  // General hello
58  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
59  m_zmqClient.publish(std::move(multicastHelloMsg));
60  // Hello for input process. TODO: merge this
61  auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
62  m_zmqClient.publish(std::move(helloMessage));
63 
64  bool inputProcessIsGone = false;
65  // The following as actually not needed, as we already know at this stage that the input process is up.
66  // But in some cases, the input process is already down again (because it was so fast), so will never receive any event...
67  const auto socketHelloAnswer = [&inputProcessIsGone](const auto & socket) {
68  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
70  inputProcessIsGone = true;
71  return false;
72  }
73  B2ASSERT("Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
74  return false;
75  };
76  // const auto pollResult = m_zmqClient.pollSocket(7200 * 1000, socketHelloAnswer);
77  const auto pollResult = m_zmqClient.pollSocket(Environment::Instance().getZMQMaximalWaitingTime(), socketHelloAnswer);
78  if (inputProcessIsGone or not pollResult) {
79  B2WARNING("It seems the input process is already gone.");
80  return;
81  }
82  // B2INFO ( "ZMQRxWorker :: hello message received, sending back ready message for buffers" << m_param_bufferSize );
83 
84  // send ready msg x buffer size
85  for (unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
86  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
87  m_zmqClient.send(std::move(readyMessage));
88  }
89  m_firstEvent = false;
90  // B2INFO ( "ZMQRxWorker :: Connection established after sending reply" );
91  }
92 
93  const auto multicastAnswer = [](const auto & socket) {
94  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
95  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
96  B2DEBUG(30, "Having received an graceful stop message. Will now go on.");
97  // By not storing anything in the data store, we will just stop event processing here...
98  return false;
99  }
100 
101  B2ERROR("Undefined message on multicast");
102  return true;
103  };
104 
105  const auto socketAnswer = [this](const auto & socket) {
106  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
107  if (message->isMessage(EMessageTypes::c_eventMessage)) {
108  B2DEBUG(30, "received event message... write it to data store");
109  // B2INFO ( "ZMQRxWorker : event received" );
110  m_streamer.read(std::move(message));
111  // if ( m_eventMetaData->getExperiment() == 42 && m_eventMetaData->getRun() == 8 )
112  if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun()))
113  B2INFO("ZMQRxWorker : special event generated by HLTZMQ2Ds received.");
114  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
115  m_zmqClient.send(std::move(readyMessage));
116  return false;
117  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
118  B2DEBUG(30, "received end message from input");
119  return false;
120  }
121 
122  B2DEBUG(30, "received unexpected message from input");
123  return true;
124  };
125 
126  // const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
127  // B2INFO ( "ZMQRxWorker : polliing started" );
128  // const int pollReply = m_zmqClient.poll(7200 * 1000, multicastAnswer, socketAnswer);
129  const int pollReply = m_zmqClient.poll(Environment::Instance().getZMQMaximalWaitingTime(), multicastAnswer, socketAnswer);
130  B2ASSERT("The input process did not send any event in some time!", pollReply);
131 
132  // B2INFO ( "ZMQRxWorker : event received and moved to data store" );
133  // B2INFO ( " ---- exp = " << m_eventMetaData->getExperiment() << " run = " << m_eventMetaData->getRun() );
134 
135  B2DEBUG(30, "Finished with event");
136  // B2INFO ( "ZMQRxWorker :: Finished with the event" );
137  } catch (zmq::error_t& ex) {
138  if (ex.num() != EINTR) {
139  B2ERROR("There was an error during the Rx worker event: " << ex.what());
140  }
141  }
142 }
143 
145 {
147 }
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
void initialize() override
Initialize the streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Receive an event and store it in the datastore. Tell the input process we are ready.
void terminate() override
Terminate the client and tell the monitor, we are done.
unsigned int m_param_maximalWaitingTime
Maximal time to wait in polling.
unsigned int m_param_bufferSize
How many events do we want to have in the buffer.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:142
Abstract base class for different kinds of events.