Belle II Software  release-05-02-19
ZMQRxOutputModule.cc
1 #include <framework/pcore/ProcHandler.h>
2 #include <framework/pcore/zmq/processModules/ZMQRxOutputModule.h>
3 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
4 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
5 #include <framework/core/Environment.h>
6 
7 using namespace std;
8 using namespace Belle2;
9 
10 REG_MODULE(ZMQRxOutput)
11 
13 {
14  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
15  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
16  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
17  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
18  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
19 
20  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
21  "set the number of processes to at least 1.",
22  Environment::Instance().getNumberProcesses());
23 }
24 
25 void ZMQRxOutputModule::initialize()
26 {
27  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
28 }
29 
30 void ZMQRxOutputModule::event()
31 {
32  try {
33  if (m_firstEvent) {
34  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
35  m_zmqClient.initialize<ZMQ_PULL>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, true);
36 
37  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
38  m_zmqClient.publish(std::move(multicastHelloMsg));
39 
40  // Listen to event backups, the stop message of the input process and the general stop messages
41  m_zmqClient.subscribe(EMessageTypes::c_eventMessage);
42  m_zmqClient.subscribe(EMessageTypes::c_lastEventMessage);
43  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
44  m_firstEvent = false;
45  }
46 
47  const auto multicastAnswer = [this](const auto & socket) {
48  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
49  if (message->isMessage(EMessageTypes::c_eventMessage)) {
50  B2DEBUG(100, "Having received an event backup. Will go in with this.");
51  m_streamer.read(std::move(message));
52  StoreObjPtr<EventMetaData> eventMetaData;
53  eventMetaData->addErrorFlag(EventMetaData::EventErrorFlag::c_HLTCrash);
54  return false;
55  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
56  B2DEBUG(100, "Having received an end message. Will not go on.");
57  // By not storing anything in the data store, we will just stop event processing here...
58  return false;
59  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
60  B2DEBUG(100, "Having received an graceful stop message. Will not go on.");
61  // By not storing anything in the data store, we will just stop event processing here...
62  return false;
63  }
64 
65  B2ERROR("Undefined message on multicast");
66  return true;
67  };
68 
69  const auto socketAnswer = [this](const auto & socket) {
70  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
71  if (message->isMessage(EMessageTypes::c_eventMessage)) {
72  m_streamer.read(std::move(message));
73  B2DEBUG(100, "received event " << m_eventMetaData->getEvent());
74  auto confirmMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_confirmMessage, m_eventMetaData);
75  m_zmqClient.publish(std::move(confirmMessage));
76  return false;
77  }
78 
79  B2ERROR("Undefined message on socket");
80  return true;
81  };
82 
83 
84  B2DEBUG(100, "Start polling");
85  const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
86  B2ASSERT("Output process did not receive any message in some time. Aborting.", pollReply);
87 
88  B2DEBUG(100, "finished reading in an event.");
89  } catch (zmq::error_t& ex) {
90  if (ex.num() != EINTR) {
91  B2ERROR("There was an error during the Rx output event: " << ex.what());
92  }
93  }
94 }
95 
96 void ZMQRxOutputModule::terminate()
97 {
98  m_zmqClient.terminate();
99 }
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::ZMQRxOutputModule
Module connecting the worker path with the output path on the output side.
Definition: ZMQRxOutputModule.h:37