8 #include <framework/pcore/zmq/processModules/ZMQTxWorkerModule.h>
9 #include <framework/pcore/zmq/messages/ZMQMessageFactory.h>
10 #include <framework/core/Environment.h>
18 addParam(
"socketName", m_param_socketName,
"Name of the socket to connect this module to.");
19 addParam(
"xpubProxySocketName", m_param_xpubProxySocketName,
"Address of the XPUB socket of the proxy");
20 addParam(
"xsubProxySocketName", m_param_xsubProxySocketName,
"Address of the XSUB socket of the proxy");
21 setPropertyFlags(EModulePropFlags::c_ParallelProcessingCertified);
23 B2ASSERT(
"Module is only allowed in a multiprocessing environment. If you only want to use a single process,"
24 "set the number of processes to at least 1.",
41 }
catch (zmq::error_t& ex) {
42 if (ex.num() != EINTR) {
43 B2ERROR(
"There was an error during the Tx worker event: " << ex.what());
static Environment & Instance()
Static method to get a reference to the Environment instance.
std::unique_ptr< EvtMessage > stream(bool addPersistentDurability=true, bool streamTransientObjects=true)
Stream the data store into an event message.
void initialize(int compressionLevel, bool handleMergeable)
Initialize this class. Call this e.g. in the first event.
void initialize(const std::string &pubSocketAddress, const std::string &subSocketAddress, const std::string &socketName, bool bind)
Initialize the multicast and a data socket of the given type.
void terminate(bool sendGoodbye=true)
Terminate the sockets properly.
void send(AZMQMessage message) const
Send a message over the data socket.
static auto createMessage(const std::string &msgIdentity, const EMessageTypes msgType, const std::unique_ptr< EvtMessage > &eventMessage)
Create an ID Message out of an identity, the type and an event message.
Module connecting the worker path with the output path on the worker side.
int m_param_compressionLevel
Parameter: Compression level of the streamer.
bool m_firstEvent
Set to false if the objects are initialized.
std::string m_param_socketName
Parameter: name of the data socket.
StreamHelper m_streamer
The data store streamer.
ZMQClient m_zmqClient
Our ZMQ client.
void event() override
Pack the datastore and stream it.
void terminate() override
Terminate the client and tell the monitor, we are done.
std::string m_param_xpubProxySocketName
Parameter: name of the pub multicast socket.
bool m_param_handleMergeable
Parameter: Can we handle mergeables?
std::string m_param_xsubProxySocketName
Parameter: name of the sub multicast socket.
#define REG_MODULE(moduleName)
Register the given module (without 'Module' suffix) with the framework.
Abstract base class for different kinds of events.