Belle II Software  release-08-01-10
ZMQRxOutputModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/ProcHandler.h>
9 #include <framework/pcore/zmq/processModules/ZMQRxOutputModule.h>
10 #include <framework/pcore/zmq/messages/ZMQDefinitions.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/core/Environment.h>
13 
14 using namespace std;
15 using namespace Belle2;
16 
17 REG_MODULE(ZMQRxOutput);
18 
19 ZMQRxOutputModule::ZMQRxOutputModule() : Module()
20 {
21  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
22  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
23  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
24  addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
25  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
26 
27  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
28  "set the number of processes to at least 1.",
29  Environment::Instance().getNumberProcesses());
30 }
31 
33 {
34  m_randomgenerator.registerInDataStore(DataStore::c_DontWriteOut);
35 }
36 
38 {
39  try {
40  if (m_firstEvent) {
43 
44  auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
45  m_zmqClient.publish(std::move(multicastHelloMsg));
46 
47  // Listen to event backups, the stop message of the input process and the general stop messages
48  m_zmqClient.subscribe(EMessageTypes::c_eventMessage);
49  m_zmqClient.subscribe(EMessageTypes::c_lastEventMessage);
50  m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
51  m_firstEvent = false;
52 
55  }
56 
57  const auto multicastAnswer = [this](const auto & socket) {
58  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
59  if (message->isMessage(EMessageTypes::c_eventMessage)) {
60  B2DEBUG(100, "Having received an event backup. Will go in with this.");
61  m_streamer.read(std::move(message));
62  StoreObjPtr<EventMetaData> eventMetaData;
63  eventMetaData->addErrorFlag(EventMetaData::EventErrorFlag::c_HLTCrash);
64  return false;
65  } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
66  B2DEBUG(100, "Having received an end message. Will not go on.");
67  // By not storing anything in the data store, we will just stop event processing here...
68  return false;
69  } else if (message->isMessage(EMessageTypes::c_terminateMessage)) {
70  B2DEBUG(100, "Having received an graceful stop message. Will not go on.");
71  // By not storing anything in the data store, we will just stop event processing here...
72  return false;
73  }
74 
75  B2ERROR("Undefined message on multicast");
76  return true;
77  };
78 
79  const auto socketAnswer = [this](const auto & socket) {
80  auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
81  if (message->isMessage(EMessageTypes::c_eventMessage)) {
82  m_streamer.read(std::move(message));
83  B2DEBUG(100, "received event " << m_eventMetaData->getEvent());
84  auto confirmMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_confirmMessage, m_eventMetaData);
85  m_zmqClient.publish(std::move(confirmMessage));
86  // Check EventMetaData and repeat poll until all end run records received
87  if (m_eventMetaData->isEndOfRun()) {
88  m_endRun--;
89  if (m_endRun == 0) {
91  B2INFO("ZMQOutputModule : sending out EndRun record.");
92  return false;
93  } else {
94  return true;
95  }
96  } else if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(),
97  m_eventMetaData->getRun())) { // Special first event
98  m_beginRun--;
99  if (m_beginRun == 0) {
101  B2INFO("ZMQOutputModule : sending out HLTZMQ first event.");
102  return false;
103  } else {
104  return true;
105  }
106  }
107 
108  return false;
109  }
110 
111  B2ERROR("Undefined message on socket");
112  return true;
113  };
114 
115 
116  B2DEBUG(100, "Start polling");
117  // const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
118  // const int pollReply = m_zmqClient.poll((unsigned int)7200 * 1000, multicastAnswer, socketAnswer);
119  const int pollReply = m_zmqClient.poll(Environment::Instance().getZMQMaximalWaitingTime(), multicastAnswer, socketAnswer);
120  B2ASSERT("Output process did not receive any message in some time. Aborting.", pollReply);
121  // B2INFO ( "ZMQRxOutput : event received" );
122 
123  B2DEBUG(30, "finished reading in an event.");
124  } catch (zmq::error_t& ex) {
125  if (ex.num() != EINTR) {
126  B2ERROR("There was an error during the Rx output event: " << ex.what());
127  }
128  }
129 }
130 
132 {
134 }
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
int getNumberProcesses() const
Returns the number of worker processes which should be used for the parallel processing.
Definition: Environment.h:145
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
Type-safe access to single objects in the data store.
Definition: StoreObjPtr.h:96
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
void initialize() override
Initialize the streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Receive an event and store it in the datastore. Confirm to the input process.
void terminate() override
Terminate the client and tell the monitor, we are done.
unsigned int m_param_maximalWaitingTime
Maximal time to wait until aborting in ms.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
int m_endRun
End Run counter.
int m_beginRun
Begin Run counter.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.