9 #include <framework/pcore/AsyncWrapper.h>
10 #include <framework/core/Path.h>
12 #include <framework/core/EventProcessor.h>
13 #include <framework/core/ModuleManager.h>
14 #include <framework/pcore/GlobalProcHandler.h>
15 #include <framework/pcore/RingBuffer.h>
16 #include <framework/pcore/RxModule.h>
17 #include <framework/pcore/TxModule.h>
18 #include <framework/datastore/StoreObjPtr.h>
19 #include <framework/dataobjects/EventMetaData.h>
30 static std::vector<RingBuffer*> rbList;
44 B2ERROR(
"AsyncWrapper::numAvailableEvents() used in synchronous thread??");
52 m_wrappedModule(
ModuleManager::Instance().registerModule(moduleType)),
53 m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
58 "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
62 AsyncWrapper::~AsyncWrapper() =
default;
66 B2INFO(
"Initializing AsyncWrapper...");
69 const int bufferSizeInts = 8000000;
93 B2INFO(
"Asynchronous process done!");
106 if (waitpid(-1,
nullptr, WNOHANG) != 0) {
108 eventMetaData->setEndOfData();
121 B2INFO(
"Waiting for asynchronous process...");
123 B2INFO(
"Done, cleaning up...");
TxModule * m_tx
transmitting module.
void initialize() override
Call this from initialize().
void event() override
Call this from event().
void terminate() override
Call this from terminate().
ModulePtr m_wrappedModule
The wrapped module.
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
RxModule * m_rx
receiving module.
static int numAvailableEvents()
Retun number of events available in the RingBuffer.
RingBuffer * m_ringBuffer
shared memory buffer
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
bool m_discardOldEvents
Discard old events when buffer is full.
provides the core event processing loop.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
void setParamList(const ModuleParamList ¶ms)
Replace existing parameter list.
Implements a path consisting of Module and/or Path objects.
Class to manage a Ring Buffer placed in an IPC shared memory.
int clear()
Clear the RingBuffer.
int numq() const
Returns number of entries/buffers in the RingBuffer.
void kill()
Cause termination of reading processes (if they use isDead()).
Module to decode data store contents from RingBuffer.
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Type-safe access to single objects in the data store.
Module for encoding data store contents into a RingBuffer.
virtual void initialize() override
Module functions to be called from main process.
virtual void event() override
This method is the core of the module.
virtual void terminate() override
This method is called at the end of the event processing.
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
void setBlockingInsert(bool block)
Whether to block until we can insert data into the ring buffer in event().
void addParam(const std::string &name, T ¶mVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Abstract base class for different kinds of events.