Belle II Software  release-05-02-19
ZMQTxWorkerModule.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Nils Braun, Anselm Baur *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 #include <framework/pcore/zmq/processModules/ZMQTxWorkerModule.h>
11 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
12 #include <framework/core/Environment.h>
13 
14 using namespace Belle2;
15 
16 REG_MODULE(ZMQTxWorker)
17 
19 {
20  addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
21  addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
22  addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
23  setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
24 
25  B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
26  "set the number of processes to at least 1.",
27  Environment::Instance().getNumberProcesses());
28 }
29 
31 {
32  try {
33  if (m_firstEvent) {
36 
37  m_firstEvent = false;
38  }
39 
40  const auto& evtMessage = m_streamer.stream();
41  auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, evtMessage);
42  m_zmqClient.send(std::move(message));
43  } catch (zmq::error_t& ex) {
44  if (ex.num() != EINTR) {
45  B2ERROR("There was an error during the Tx worker event: " << ex.what());
46  }
47  }
48 }
49 
51 {
53 }
Belle2::StreamHelper::stream
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:31
Belle2::ZMQTxWorkerModule::m_param_xpubProxySocketName
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
Definition: ZMQTxWorkerModule.h:51
Belle2::ZMQClient::terminate
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:20
Belle2::StreamHelper::initialize
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:20
REG_MODULE
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:652
Belle2::ZMQTxWorkerModule::terminate
void terminate() override
Terminate the client and tell the monitor, we are done.
Definition: ZMQTxWorkerModule.cc:50
Belle2::ZMQTxWorkerModule::event
void event() override
Pack the datastore and stream it.
Definition: ZMQTxWorkerModule.cc:30
Belle2::ZMQTxWorkerModule::m_param_compressionLevel
int m_param_compressionLevel
Parameter: Compression level of the streamer.
Definition: ZMQTxWorkerModule.h:55
Belle2::ZMQTxWorkerModule::m_firstEvent
bool m_firstEvent
Set to false if the objects are initialized.
Definition: ZMQTxWorkerModule.h:46
Belle2::ZMQTxWorkerModule::m_param_xsubProxySocketName
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
Definition: ZMQTxWorkerModule.h:53
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::ZMQTxWorkerModule::m_param_socketName
std::string m_param_socketName
Parameter: name of the data socket.
Definition: ZMQTxWorkerModule.h:49
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ZMQClient::send
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:52
Belle2::ZMQMessageFactory::createMessage
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Definition: ZMQMessageFactory.h:37
Belle2::ZMQClient::initialize
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:55
Belle2::ZMQTxWorkerModule
Module connecting the worker path with the output path on the worker side.
Definition: ZMQTxWorkerModule.h:35
Belle2::ZMQTxWorkerModule::m_streamer
StreamHelper m_streamer
The data store streamer.
Definition: ZMQTxWorkerModule.h:62
Belle2::ZMQTxWorkerModule::m_param_handleMergeable
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
Definition: ZMQTxWorkerModule.h:57
Belle2::Environment::Instance
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:31
Belle2::ZMQTxWorkerModule::m_zmqClient
ZMQClient m_zmqClient
Our ZMQ client.
Definition: ZMQTxWorkerModule.h:60