Belle II Software  release-06-01-15
ZMQRxOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/ProcHandler.h>
9 #include <framework/pcore/zmq/processModules/ZMQRxOutputModule.h>
10 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/core/Environment.h>
13 
14 using namespace std;
15 using namespace Belle2;
16 
17 REG_MODULE(ZMQRxOutput)
18 
20 {
21  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
22  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
23  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
24  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
25  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
26 
27  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
28  "set the number of processes to at least 1.",
29  Environment::Instance().getNumberProcesses());
30 }
31 
32 void ZMQRxOutputModule::initialize()
33 {
34  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
35 }
36 
37 void ZMQRxOutputModule::event()
38 {
39  try {
40  if (m_firstEvent) {
41  m_streamer.initialize(m_param_compressionLevel, m_param_handleMergeable);
42  m_zmqClient.initialize<ZMQ_PULL>(m_param_xpubProxySocketName, m_param_xsubProxySocketName, m_param_socketName, true);
43 
44  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
45  m_zmqClient.publish(std::move(multicastHelloMsg));
46 
47  // Listen to event backups, the stop message of the input process and the general stop messages
48  m_zmqClient.subscribe(EMessageTypes::c_eventMessage);
49  m_zmqClient.subscribe(EMessageTypes::c_lastEventMessage);
50  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
51  m_firstEvent = false;
52  }
53 
54  const auto multicastAnswer = [this](const auto & socket) {
55  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
56  if (message->isMessage(EMessageTypes::c_eventMessage)) {
57  B2DEBUG(100, "Having received an event backup. Will go in with this.");
58  m_streamer.read(std::move(message));
59  StoreObjPtr<EventMetaData> eventMetaData;
60  eventMetaData->addErrorFlag(EventMetaData::EventErrorFlag::c_HLTCrash);
61  return false;
62  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
63  B2DEBUG(100, "Having received an end message. Will not go on.");
64  // By not storing anything in the data store, we will just stop event processing here...
65  return false;
66  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
67  B2DEBUG(100, "Having received an graceful stop message. Will not go on.");
68  // By not storing anything in the data store, we will just stop event processing here...
69  return false;
70  }
71 
72  B2ERROR("Undefined message on multicast");
73  return true;
74  };
75 
76  const auto socketAnswer = [this](const auto & socket) {
77  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
78  if (message->isMessage(EMessageTypes::c_eventMessage)) {
79  m_streamer.read(std::move(message));
80  B2DEBUG(100, "received event " << m_eventMetaData->getEvent());
81  auto confirmMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_confirmMessage, m_eventMetaData);
82  m_zmqClient.publish(std::move(confirmMessage));
83  return false;
84  }
85 
86  B2ERROR("Undefined message on socket");
87  return true;
88  };
89 
90 
91  B2DEBUG(100, "Start polling");
92  const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
93  B2ASSERT("Output process did not receive any message in some time. Aborting.", pollReply);
94 
95  B2DEBUG(100, "finished reading in an event.");
96  } catch (zmq::error_t& ex) {
97  if (ex.num() != EINTR) {
98  B2ERROR("There was an error during the Rx output event: " << ex.what());
99  }
100  }
101 }
102 
103 void ZMQRxOutputModule::terminate()
104 {
105  m_zmqClient.terminate();
106 }
Base class for Modules.
Definition: Module.h:72
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:95
Module connecting the worker path with the output path on the output side.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.