Belle II Software  release-06-00-14
ZMQRxWorkerModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 
9 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
10 
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
13 #include <framework/core/Environment.h>
14 
15 using namespace std;
16 using namespace Belle2;
17 
18 REG_MODULE(ZMQRxWorker)
19 
21 {
22  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
23  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
24  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
25  addParam("eventBufferSize", m_param_bufferSize, "Maximal number of events to store in the internal buffer");
26  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
27 
28  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
29 
30  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
31  "set the number of processes to at least 1.",
32  Environment::Instance().getNumberProcesses());
33 }
34 
35 void ZMQRxWorkerModule::initialize()
36 {
37  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
38 }
39 
40 void ZMQRxWorkerModule::event()
41 {
42  try {
43  if (m_firstEvent) {
44  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
45  m_zmqClient.initialize<ZMQ_DEALER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, false);
46 
47  // Listen to stop messages
48  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
49 
50  // General hello
51  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
52  m_zmqClient.publish(std::move(multicastHelloMsg));
53  // Hello for input process. TODO: merge this
54  auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
55  m_zmqClient.publish(std::move(helloMessage));
56 
57  bool inputProcessIsGone = false;
58  // The following as actually not needed, as we already know at this stage that the input process is up.
59  // But in some cases, the input process is already down again (because it was so fast), so will never receive any event...
60  const auto socketHelloAnswer = [&inputProcessIsGone](const auto & socket) {
61  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
62  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
63  inputProcessIsGone = true;
64  return false;
65  }
66  B2ASSERT("Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
67  return false;
68  };
69  const auto pollResult = m_zmqClient.pollSocket(60 * 1000, socketHelloAnswer);
70  if (inputProcessIsGone or not pollResult) {
71  B2WARNING("It seems the input process is already gone.");
72  return;
73  }
74 
75  // send ready msg x buffer size
76  for (unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
77  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
78  m_zmqClient.send(std::move(readyMessage));
79  }
80  m_firstEvent = false;
81  }
82 
83  const auto multicastAnswer = [](const auto & socket) {
84  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
85  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
86  B2DEBUG(10, "Having received an graceful stop message. Will now go on.");
87  // By not storing anything in the data store, we will just stop event processing here...
88  return false;
89  }
90 
91  B2ERROR("Undefined message on multicast");
92  return true;
93  };
94 
95  const auto socketAnswer = [this](const auto & socket) {
96  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
97  if (message->isMessage(EMessageTypes::c_eventMessage)) {
98  B2DEBUG(10, "received event message... write it to data store");
99  m_streamer.read(std::move(message));
100  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
101  m_zmqClient.send(std::move(readyMessage));
102  return false;
103  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
104  B2DEBUG(10, "received end message from input");
105  return false;
106  }
107 
108  B2DEBUG(10, "received unexpected message from input");
109  return true;
110  };
111 
112  const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
113  B2ASSERT("The input process did not send any event in some time!", pollReply);
114 
115  B2DEBUG(10, "Finished with event");
116  } catch (zmq::error_t& ex) {
117  if (ex.num() != EINTR) {
118  B2ERROR("There was an error during the Rx worker event: " << ex.what());
119  }
120  }
121 }
122 
123 void ZMQRxWorkerModule::terminate()
124 {
125  m_zmqClient.terminate();
126 }
Base class for Modules.
Definition: Module.h:72
Module connecting the input path with the worker path on the worker side.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.