Belle II Software development
ZMQRxWorkerModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/zmq/processModules/ZMQRxWorkerModule.h>
10
11#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12#include <framework/pcore/zmq/messages/ZMQDefinitions.h>
13#include <framework/core/Environment.h>
14
15using namespace std;
16using namespace Belle2;
17
18
19
20REG_MODULE(ZMQRxWorker);
21
22
24{
25 addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
26 addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
27 addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
28 addParam("eventBufferSize", m_param_bufferSize, "Maximal number of events to store in the internal buffer");
29 addParam("maximalWaitingTime", m_param_maximalWaitingTime, "Maximal time to wait for any message");
30
32
33 B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
34 "set the number of processes to at least 1.",
35 Environment::Instance().getNumberProcesses());
36}
37
39{
41}
42
44{
45 try {
46
47 if (m_firstEvent) {
50
51 // Wait some time until input process comes up
52 sleep(1);
53
54 // Listen to stop messages
55 m_zmqClient.subscribe(EMessageTypes::c_terminateMessage);
56
57 // General hello
58 auto multicastHelloMsg = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
59 m_zmqClient.publish(std::move(multicastHelloMsg));
60 // Hello for input process. TODO: merge this
61 auto helloMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_helloMessage, getpid());
62 m_zmqClient.publish(std::move(helloMessage));
63
64 bool inputProcessIsGone = false;
65 // The following as actually not needed, as we already know at this stage that the input process is up.
66 // But in some cases, the input process is already down again (because it was so fast), so will never receive any event...
67 const auto socketHelloAnswer = [&inputProcessIsGone](const auto & socket) {
68 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
69 if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
70 inputProcessIsGone = true;
71 return false;
72 }
73 B2ASSERT("Received unexpected message from input.", message->isMessage(EMessageTypes::c_helloMessage));
74 return false;
75 };
76 // const auto pollResult = m_zmqClient.pollSocket(7200 * 1000, socketHelloAnswer);
77 const auto pollResult = m_zmqClient.pollSocket(Environment::Instance().getZMQMaximalWaitingTime(), socketHelloAnswer);
78 if (inputProcessIsGone or not pollResult) {
79 B2WARNING("It seems the input process is already gone.");
80 return;
81 }
82 // B2INFO ( "ZMQRxWorker :: hello message received, sending back ready message for buffers" << m_param_bufferSize );
83
84 // send ready msg x buffer size
85 for (unsigned int bufferIndex = 0; bufferIndex < m_param_bufferSize; bufferIndex++) {
86 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
87 m_zmqClient.send(std::move(readyMessage));
88 }
89 m_firstEvent = false;
90 // B2INFO ( "ZMQRxWorker :: Connection established after sending reply" );
91 }
92
93 const auto multicastAnswer = [](const auto & socket) {
94 const auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
95 if (message->isMessage(EMessageTypes::c_terminateMessage)) {
96 B2DEBUG(30, "Having received an graceful stop message. Will now go on.");
97 // By not storing anything in the data store, we will just stop event processing here...
98 return false;
99 }
100
101 B2ERROR("Undefined message on multicast");
102 return true;
103 };
104
105 const auto socketAnswer = [this](const auto & socket) {
106 auto message = ZMQMessageFactory::fromSocket<ZMQNoIdMessage>(socket);
107 if (message->isMessage(EMessageTypes::c_eventMessage)) {
108 B2DEBUG(30, "received event message... write it to data store");
109 // B2INFO ( "ZMQRxWorker : event received" );
110 m_streamer.read(std::move(message));
111 // if ( m_eventMetaData->getExperiment() == 42 && m_eventMetaData->getRun() == 8 )
112 if (Environment::Instance().isZMQDAQFirstEvent(m_eventMetaData->getExperiment(), m_eventMetaData->getRun()))
113 B2INFO("ZMQRxWorker : special event generated by HLTZMQ2Ds received.");
114 auto readyMessage = ZMQMessageFactory::createMessage(EMessageTypes::c_readyMessage);
115 m_zmqClient.send(std::move(readyMessage));
116 return false;
117 } else if (message->isMessage(EMessageTypes::c_lastEventMessage)) {
118 B2DEBUG(30, "received end message from input");
119 return false;
120 }
121
122 B2DEBUG(30, "received unexpected message from input");
123 return true;
124 };
125
126 // const int pollReply = m_zmqClient.poll(m_param_maximalWaitingTime, multicastAnswer, socketAnswer);
127 // B2INFO ( "ZMQRxWorker : polliing started" );
128 // const int pollReply = m_zmqClient.poll(7200 * 1000, multicastAnswer, socketAnswer);
129 const int pollReply = m_zmqClient.poll(Environment::Instance().getZMQMaximalWaitingTime(), multicastAnswer, socketAnswer);
130 B2ASSERT("The input process did not send any event in some time!", pollReply);
131
132 // B2INFO ( "ZMQRxWorker : event received and moved to data store" );
133 // B2INFO ( " ---- exp = " << m_eventMetaData->getExperiment() << " run = " << m_eventMetaData->getRun() );
134
135 B2DEBUG(30, "Finished with event");
136 // B2INFO ( "ZMQRxWorker :: Finished with the event" );
137 } catch (zmq::error_t& ex) {
138 if (ex.num() != EINTR) {
139 B2ERROR("There was an error during the Rx worker event: " << ex.what());
140 }
141 }
142}
143
145{
147}
@ c_DontWriteOut
Object/array should be NOT saved by output modules.
Definition: DataStore.h:71
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
void read(std::unique_ptr< ZMQNoIdMessage > message)
Read in a ZMQ message and rebuilt the data store from it.
Definition: StreamHelper.cc:41
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void publish(AZMQMessage message) const
Publish the message to the multicast.
Definition: ZMQClient.h:53
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
void subscribe(EMessageTypes messageType)
Subscribe to the given multicast message type.
Definition: ZMQClient.cc:99
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
ZMQRxWorkerModule()
Constructor setting the moudle paramters.
void initialize() override
Initialize the streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Receive an event and store it in the datastore. Tell the input process we are ready.
void terminate() override
Terminate the client and tell the monitor, we are done.
unsigned int m_param_maximalWaitingTime
Maximal time to wait in polling.
unsigned int m_param_bufferSize
How many events do we want to have in the buffer.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
StoreObjPtr< RandomGenerator > m_randomgenerator
The random generator in the data store.
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
int poll(unsigned int timeout, AMulticastAnswer multicastAnswer, ASocketAnswer socketAnswer) const
Poll both the multicast and the data socket until, either:
Definition: ZMQClient.h:113
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
int pollSocket(unsigned int timeout, ASocketAnswer socketAnswer) const
Poll method to only the data socket.
Definition: ZMQClient.h:142
Abstract base class for different kinds of events.
STL namespace.