Belle II Software  release-05-01-25
AsyncWrapper.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2013 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * Contributors: Christian Pulvermacher *
7  * *
8  * This software is provided "as is" without any warranty. *
9  **************************************************************************/
10 
11 #include <framework/pcore/AsyncWrapper.h>
12 #include <framework/core/Path.h>
13 
14 #include <framework/core/EventProcessor.h>
15 #include <framework/core/ModuleManager.h>
16 #include <framework/pcore/GlobalProcHandler.h>
17 #include <framework/pcore/RingBuffer.h>
18 #include <framework/pcore/RxModule.h>
19 #include <framework/pcore/TxModule.h>
20 #include <framework/datastore/StoreObjPtr.h>
21 #include <framework/dataobjects/EventMetaData.h>
22 
23 #include <sys/wait.h>
24 
25 #include <cstdlib>
26 
27 using namespace Belle2;
28 
29 bool AsyncWrapper::s_isAsync = false;
31 namespace {
32  static std::vector<RingBuffer*> rbList;
33  void cleanupIPC()
34  {
35  if (!AsyncWrapper::isAsync()) {
36  for (RingBuffer* rb : rbList)
37  delete rb;
38  rbList.clear();
39  }
40  }
41 }
42 
44 {
45  if (!s_isAsync) {
46  B2ERROR("AsyncWrapper::numAvailableEvents() used in synchronous thread??");
47  return true;
48  }
49 
50  return s_currentRingBuffer->numq();
51 }
52 
53 AsyncWrapper::AsyncWrapper(const std::string& moduleType): Module(),
54  m_wrappedModule(ModuleManager::Instance().registerModule(moduleType)),
55  m_ringBuffer(nullptr), m_rx(nullptr), m_tx(nullptr)
56 {
57  setParamList(m_wrappedModule->getParamList()); //inherit parameters from wrapped module
58 
59  addParam("discardOldEvents", m_discardOldEvents,
60  "Discard old events when buffer is full. If false, the main process will wait until there is enough space in the buffer. (i.e. synchronous operation)",
61  true);
62 }
63 
64 AsyncWrapper::~AsyncWrapper() = default;
65 
67 {
68  B2INFO("Initializing AsyncWrapper...");
69 
71  const int bufferSizeInts = 8000000; //~32M, within Ubuntu's shmmax limit
72  m_ringBuffer = new RingBuffer(bufferSizeInts);
73  rbList.push_back(m_ringBuffer);
74  m_rx = new RxModule(m_ringBuffer);
76  m_tx = new TxModule(m_ringBuffer);
78  m_tx->setBlockingInsert(!m_discardOldEvents); //actually decouple this process
79 
80  //fork out one extra process
82  //forked thread:
83  //allow access to async parts
84  s_isAsync = true;
86 
87  PathPtr path(new Path);
88  path->addModule(ModulePtr(m_rx));
89  path->addModule(m_wrappedModule);
90 
91  //LogSystem::Instance().resetMessageCounter(); //for testing parallel processing
92 
93  EventProcessor eventProc;
94  eventProc.process(path);
95  B2INFO("Asynchronous process done!");
96  exit(0);
97  } else {
98  atexit(cleanupIPC);
99 
100  //main thread: chain tx and return
101  m_tx->initialize();
102  }
103 }
104 
106 {
108  if (waitpid(-1, nullptr, WNOHANG) != 0) {
109  StoreObjPtr<EventMetaData> eventMetaData;
110  eventMetaData->setEndOfData();
111  }
112 
113  m_tx->event();
114  }
115 }
116 
118 {
120  m_tx->terminate();
121 
122  m_ringBuffer->kill();
123  B2INFO("Waiting for asynchronous process...");
125  B2INFO("Done, cleaning up...");
126  delete m_tx;
127  delete m_rx;
128  delete m_ringBuffer;
129  for (RingBuffer*& rb : rbList)
130  if (rb == m_ringBuffer)
131  rb = nullptr;
132  }
133 }
Belle2::AsyncWrapper::numAvailableEvents
static int numAvailableEvents()
Retun number of events available in the RingBuffer.
Definition: AsyncWrapper.cc:43
Belle2::RingBuffer::clear
int clear()
Clear the RingBuffer.
Definition: RingBuffer.cc:429
Belle2::EventProcessor::process
void process(const PathPtr &startPath, long maxEvent=0)
Processes the full module chain, starting with the first module in the given path.
Definition: EventProcessor.cc:125
Belle2::TxModule::event
virtual void event() override
This method is the core of the module.
Definition: TxModule.cc:72
Belle2::TxModule::initialize
virtual void initialize() override
Module functions to be called from main process.
Definition: TxModule.cc:40
Belle2::AsyncWrapper::s_currentRingBuffer
static RingBuffer * s_currentRingBuffer
if s_isAsync is true, this contains the corresponding RingBuffer, see numAvailableEvents().
Definition: AsyncWrapper.h:93
Belle2::AsyncWrapper::s_isAsync
static bool s_isAsync
true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:90
Belle2::RxModule
Module to decode data store contents from RingBuffer.
Definition: RxModule.h:25
Belle2::GlobalProcHandler::initialize
static void initialize(unsigned int nWorkerProc)
Create a new process handler, which will handle nWorkerProc processes.
Definition: GlobalProcHandler.cc:126
Belle2::TxModule
Module for encoding data store contents into a RingBuffer.
Definition: TxModule.h:25
Belle2::TxModule::terminate
virtual void terminate() override
This method is called at the end of the event processing.
Definition: TxModule.cc:111
Belle2::TxModule::setBlockingInsert
void setBlockingInsert(bool block)
Whether to block until we can insert data into the ring buffer in event().
Definition: TxModule.h:50
Belle2::AsyncWrapper::terminate
void terminate() override
Call this from terminate().
Definition: AsyncWrapper.cc:117
Belle2::GlobalProcHandler::startWorkerProcesses
static bool startWorkerProcesses(unsigned int numProcesses)
Fork and initialize as many worker processes as requested.
Definition: GlobalProcHandler.cc:146
Belle2::RingBuffer::numq
int numq() const
Returns number of entries/buffers in the RingBuffer.
Definition: RingBuffer.cc:371
Belle2::AsyncWrapper::m_tx
TxModule * m_tx
transmitting module.
Definition: AsyncWrapper.h:84
Belle2::Module
Base class for Modules.
Definition: Module.h:74
Belle2::RingBuffer
Class to manage a Ring Buffer placed in an IPC shared memory.
Definition: RingBuffer.h:36
Belle2::AsyncWrapper::isAsync
static bool isAsync()
returns true if the current process is on the receiving (async) side of an AsyncWrapper.
Definition: AsyncWrapper.h:67
Belle2::AsyncWrapper::event
void event() override
Call this from event().
Definition: AsyncWrapper.cc:105
Belle2::AsyncWrapper::m_discardOldEvents
bool m_discardOldEvents
Discard old events when buffer is full.
Definition: AsyncWrapper.h:87
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::StoreObjPtr
Type-safe access to single objects in the data store.
Definition: ParticleList.h:33
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::RingBuffer::kill
void kill()
Cause termination of reading processes (if they use isDead()).
Definition: RingBuffer.cc:392
Belle2::ModulePtr
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:42
Belle2::GlobalProcHandler::isWorkerProcess
static bool isWorkerProcess()
Return true if the process is of type c_Worker.
Definition: GlobalProcHandler.cc:279
Belle2::TxModule::disableMergeableHandling
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: TxModule.h:53
Belle2::AsyncWrapper::m_rx
RxModule * m_rx
receiving module.
Definition: AsyncWrapper.h:81
Belle2::Module::addParam
void addParam(const std::string &name, T &paramVariable, const std::string &description, const T &defaultValue)
Adds a new parameter to the module.
Definition: Module.h:562
Belle2::RxModule::disableMergeableHandling
void disableMergeableHandling(bool disable=true)
Disable handling of Mergeable objects.
Definition: RxModule.h:44
Belle2::Path
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:40
Belle2::GlobalProcHandler::waitForAllProcesses
static void waitForAllProcesses()
Wait until all forked processes handled by this GlobalProcHandler.
Definition: GlobalProcHandler.cc:289
Belle2::ModuleManager
The ModuleManager Class.
Definition: ModuleManager.h:60
Belle2::AsyncWrapper::initialize
void initialize() override
Call this from initialize().
Definition: AsyncWrapper.cc:66
Belle2::AsyncWrapper::AsyncWrapper
AsyncWrapper(const std::string &moduleType)
Wrap am module of given type.
Definition: AsyncWrapper.cc:53
Belle2::AsyncWrapper::m_wrappedModule
ModulePtr m_wrappedModule
The wrapped module.
Definition: AsyncWrapper.h:75
Belle2::AsyncWrapper::m_ringBuffer
RingBuffer * m_ringBuffer
shared memory buffer
Definition: AsyncWrapper.h:78
Belle2::EventProcessor
provides the core event processing loop.
Definition: EventProcessor.h:39
Belle2::Module::setParamList
void setParamList(const ModuleParamList &params)
Replace existing parameter list.
Definition: Module.h:503