Belle II Software development
ZMQTxWorkerModule.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/zmq/processModules/ZMQTxWorkerModule.h>
9#include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10#include <framework/core/Environment.h>
11
12using namespace Belle2;
13
14REG_MODULE(ZMQTxWorker);
15
17{
18 addParam("socketName", m_param_socketName, "Name of the socket to connect this module to.");
19 addParam("xpubProxySocketName", m_param_xpubProxySocketName, "Address of the XPUB socket of the proxy");
20 addParam("xsubProxySocketName", m_param_xsubProxySocketName, "Address of the XSUB socket of the proxy");
22
23 B2ASSERT("Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
24 "set the number of processes to at least 1.",
25 Environment::Instance().getNumberProcesses());
26}
27
29{
30 try {
31 if (m_firstEvent) {
34
35 m_firstEvent = false;
36 }
37
38 const auto& evtMessage = m_streamer.stream();
39 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, evtMessage);
40 m_zmqClient.send(std::move(message));
41 // B2INFO ( "ZMQTxWorker : an event sent" );
42 } catch (zmq::error_t& ex) {
43 if (ex.num() != EINTR) {
44 B2ERROR("There was an error during the Tx worker event: " << ex.what());
45 }
46 }
47}
48
50{
51 if (m_eventMetaData->isEndOfRun() != 1) return;
52
53 const auto& evtMessage = m_streamer.stream();
54 auto message = ZMQMessageFactory::createMessage(EMessageTypes::c_eventMessage, evtMessage);
55 m_zmqClient.send(std::move(message));
56}
57
59{
61}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
Base class for Modules.
Definition: Module.h:72
void setPropertyFlags(unsigned int propertyFlags)
Sets the flags for the module properties.
Definition: Module.cc:208
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
Definition: StreamHelper.cc:29
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
Definition: StreamHelper.cc:18
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketAddress, bool bind)
Initialize the multicast and a data socket of the given type.
Definition: ZMQClient.cc:53
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
Definition: ZMQClient.cc:18
void send(AZMQMessage message) const
Send a message over the data socket.
Definition: ZMQClient.h:43
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
ZMQTxWorkerModule()
Constructor setting the moudle paramters.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Pack the datastore and stream it.
void endRun() override
EndRun processing.
void terminate() override
Terminate the client and tell the monitor, we are done.
StoreObjPtr< EventMetaData > m_eventMetaData
The event meta data in the data store needed for confirming events.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:560
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Definition: Module.h:650
Abstract base class for different kinds of events.