Belle II Software  release-05-02-19
ZMQRxWorkerModule.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
12 
13 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
14 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
15 #include <framework/core/Environment.h>
16 
17 using namespace std;
18 using namespace Belle2;
19 
20 REG_MODULE(ZMQRxWorker)
21 
23 {
24  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
25  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
26  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
27  addParam("eventBufferSize", m_param_bufferSize, "Maximal number of events to store in the internal buffer");
28  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
29 
30  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
31 
32  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
33  "set the number of processes to at least 1.",
34  Environment::Instance().getNumberProcesses());
35 }
36 
37 void ZMQRxWorkerModule::initialize()
38 {
39  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
40 }
41 
42 void ZMQRxWorkerModule::event()
43 {
44  try {
45  if (m_firstEvent) {
46  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
47  m_zmqClient.initialize<ZMQ_DEALER>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, false);
48 
49  // Listen to stop messages
50  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
51 
52  // General hello
53  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
54  m_zmqClient.publish(std::move(multicastHelloMsg));
55  // Hello for input process. TODO: merge this
56  auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
57  m_zmqClient.publish(std::move(helloMessage));
58 
59  bool inputProcessIsGone = false;
60  // The following as actually not needed, as we already know at this stage that the input process is up.
61  // But in some cases, the input process is already down again (because it was so fast), so will never receive any event...
62  const auto socketHelloAnswer = [&inputProcessIsGone](const auto & socket) {
63  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
64  if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
65  inputProcessIsGone = true;
66  return false;
67  }
68  B2ASSERT("Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
69  return false;
70  };
71  const auto pollResult = m_zmqClient.pollSocket(60 * 1000, socketHelloAnswer);
72  if (inputProcessIsGone or not pollResult) {
73  B2WARNING("It seems the input process is already gone.");
74  return;
75  }
76 
77  // send ready msg x buffer size
78  for (unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
79  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
80  m_zmqClient.send(std::move(readyMessage));
81  }
82  m_firstEvent = false;
83  }
84 
85  const auto multicastAnswer = [](const auto & socket) {
86  const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
87  if (message->isMessage(EMessageTypes::c_terminateMessage)) {
88  B2DEBUG(10, "Having received an graceful stop message. Will now go on.");
89  // By not storing anything in the data store, we will just stop event processing here...
90  return false;
91  }
92 
93  B2ERROR("Undefined message on multicast");
94  return true;
95  };
96 
97  const auto socketAnswer = [this](const auto & socket) {
98  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
99  if (message->isMessage(EMessageTypes::c_eventMessage)) {
100  B2DEBUG(10, "received event message... write it to data store");
101  m_streamer.read(std::move(message));
102  auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
103  m_zmqClient.send(std::move(readyMessage));
104  return false;
105  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
106  B2DEBUG(10, "received end message from input");
107  return false;
108  }
109 
110  B2DEBUG(10, "received unexpected message from input");
111  return true;
112  };
113 
114  const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
115  B2ASSERT("The input process did not send any event in some time!", pollReply);
116 
117  B2DEBUG(10, "Finished with event");
118  } catch (zmq::error_t& ex) {
119  if (ex.num() != EINTR) {
120  B2ERROR("There was an error during the Rx worker event: " << ex.what());
121  }
122  }
123 }
124 
125 void ZMQRxWorkerModule::terminate()
126 {
127  m_zmqClient.terminate();
128 }
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQRxWorkerModule
Module connecting the input path with the worker path on the worker side.
Definition: ZMQRxWorkerModule.h:36