Belle II Software light-2505-deimos
AsyncWrapper.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8
9#include <framework/pcore/AsyncWrapper.h>
10#include <framework/core/Path.h>
11
12#include <framework/core/EventProcessor.h>
13#include <framework/core/ModuleManager.h>
14#include <framework/pcore/GlobalProcHandler.h>
15#include <framework/pcore/RingBuffer.h>
16#include <framework/pcore/RxModule.h>
17#include <framework/pcore/TxModule.h>
18#include <framework/datastore/StoreObjPtr.h>
19#include <framework/dataobjects/EventMetaData.h>
20
21#include <sys/wait.h>
22
23#include <cstdlib>
24
25using namespace Belle2;
26
27bool AsyncWrapper::s_isAsync = false;
29namespace {
30 static std::vector<RingBuffer*> rbList;
31 void cleanupIPC()
32 {
33 if (!AsyncWrapper::isAsync()) {
34 for (RingBuffer* rb : rbList)
35 delete rb;
36 rbList.clear();
37 }
38 }
39}
40
42{
43 if (!s_isAsync) {
44 B2ERROR("AsyncWrapper::numAvailableEvents() used in synchronous thread??");
45 return true;
46 }
47
48 return s_currentRingBuffer->numq();
49}
50
51AsyncWrapper::AsyncWrapper(const std::string& moduleType): Module(),
52 m_wrappedModule(ModuleManager::Instance().registerModule(moduleType)),
53 m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
54{
55 setParamList(m_wrappedModule->getParamList()); //inherit parameters from wrapped module
56
57 addParam("discardOldEvents", m_discardOldEvents,
58 "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
59 true);
60}
61
62AsyncWrapper::~AsyncWrapper() = default;
63
65{
66 B2INFO("Initializing AsyncWrapper...");
67
69 const int bufferSizeInts = 8000000; //~32M, within Ubuntu's shmmax limit
70 m_ringBuffer = new RingBuffer(bufferSizeInts);
71 rbList.push_back(m_ringBuffer);
73 m_rx->disableMergeableHandling();
75 m_tx->disableMergeableHandling();
76 m_tx->setBlockingInsert(!m_discardOldEvents); //actually decouple this process
77
78 //fork out one extra process
80 //forked thread:
81 //allow access to async parts
82 s_isAsync = true;
84
85 PathPtr path(new Path);
86 path->addModule(ModulePtr(m_rx));
87 path->addModule(m_wrappedModule);
88
89 //LogSystem::Instance().resetMessageCounter(); //for testing parallel processing
90
91 EventProcessor eventProc;
92 eventProc.process(path);
93 B2INFO("Asynchronous process done!");
94 exit(0);
95 } else {
96 atexit(cleanupIPC);
97
98 //main thread: chain tx and return
99 m_tx->initialize();
100 }
101}
102
104{
106 if (waitpid(-1, nullptr, WNOHANG) != 0) {
107 StoreObjPtr<EventMetaData> eventMetaData;
108 eventMetaData->setEndOfData();
109 }
110
111 m_tx->event();
112 }
113}
114
116{
118 m_tx->terminate();
119
120 m_ringBuffer->kill();
121 B2INFO("Waiting for asynchronous process...");
123 B2INFO("Done, cleaning up...");
124 delete m_tx;
125 delete m_rx;
126 delete m_ringBuffer;
127 for (RingBuffer*& rb : rbList)
128 if (rb == m_ringBuffer)
129 rb = nullptr;
130 }
131}
TxModule * m_tx
transmitting module.
void initialize() override
Call this from initialize().
void event() override
Call this from event().
void terminate() override
Call this from terminate().
ModulePtr m_wrappedModule
The wrapped module.
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
RxModule * m_rx
receiving module.
static int numAvailableEvents()
Return number of events available in the RingBuffer.
RingBuffer * m_ringBuffer
shared memory buffer
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
bool m_discardOldEvents
Discard old events when buffer is full.
provides the core event processing loop.
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
The ModuleManager Class.
Module()
Constructor.
Definition Module.cc:30
void setParamList(const ModuleParamList &params)
Replace existing parameter list.
Definition Module.h:500
Implements a path consisting of Module and/or Path objects.
Definition Path.h:38
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition RingBuffer.h:39
Module to decode data store contents from RingBuffer.
Definition RxModule.h:25
Type-safe access to single objects in the data store.
Definition StoreObjPtr.h:96
Module for encoding data store contents into a RingBuffer.
Definition TxModule.h:25
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition Module.h:559
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition Module.h:43
Abstract base class for different kinds of events.