Belle II Software  release-06-02-00
ZMQTxWorkerModule.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/zmq/processModules/ZMQTxWorkerModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/core/Environment.h>
11 
12 using namespace Belle2;
13 
14 REG_MODULE(ZMQTxWorker)
15 
17 {
18  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
19  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
20  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
21  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
22 
23  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
24  "set the number of processes to at least 1.",
25  Environment::Instance().getNumberProcesses());
26 }
27 
29 {
30  try {
31  if (m_firstEvent) {
34 
35  m_firstEvent = false;
36  }
37 
38  const auto& evtMessage = m_streamer.stream();
39  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, evtMessage);
40  m_zmqClient.send(std::move(message));
41  } catch (zmq::error_t& ex) {
42  if (ex.num() != EINTR) {
43  B2ERROR("There was an error during the Tx worker event: " << ex.what());
44  }
45  }
46 }
47 
49 {
51 }
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
Base class for Modules.
Definition: Module.h:72
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:42
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Module connecting the worker path with the output path on the worker side.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Pack the datastore and stream it.
void terminate() override
Terminate the client and tell the monitor, we are done.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.