Belle II Software  release-05-01-25
PathUtils.cc
1 /**************************************************************************
2  * BASF2 (Belle Analysis Framework 2) *
3  * Copyright(C) 2018 - Belle II Collaboration *
4  * *
5  * Author: The Belle II Collaboration *
6  * *
7  * This software is provided "as is" without any warranty. *
8  **************************************************************************/
9 #include <framework/pcore/PathUtils.h>
10 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
11 #include <framework/core/ModuleManager.h>
12 #include <framework/core/Environment.h>
13 #include <set>
14 
15 using namespace Belle2;
16 
17 std::tuple<PathPtr, PathPtr, PathPtr> PathUtils::splitPath(const PathPtr& path)
18 {
19  //modules that can be parallelized, but should not go into a parallel section by themselves
20  std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
21 
22  PathPtr inputPath(new Path);
23  PathPtr mainPath(new Path);
24  PathPtr outputPath(new Path);
25 
26  int stage = 0; //0: in, 1: event/main, 2: out
27  for (const ModulePtr& module : path->getModules()) {
28  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
29  //entire conditional path must also be compatible
30  if (hasParallelFlag and module->hasCondition()) {
31  for (const auto& conditionPath : module->getAllConditionPaths()) {
33  hasParallelFlag = false;
34  }
35  }
36  }
37  //if modules have parallal flag -> stage = 1 , event/main
38  if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
39  stage++;
40 
41  if (stage == 2) {
42  bool path_is_useful = false;
43  for (const auto& parallelModule : mainPath->getModules()) {
44  if (uselessParallelModules.count(parallelModule->getType()) == 0) {
45  path_is_useful = true;
46  break;
47  }
48  }
49  if (not path_is_useful) {
50  //merge mainPath back into input path
51  inputPath->addPath(mainPath);
52  mainPath.reset(new Path);
53  //and search for further parallel sections
54  stage = 0;
55  }
56  }
57  }
58 
59  if (stage == 0) {
60  inputPath->addModule(module);
61  } else if (stage == 1) {
62  mainPath->addModule(module);
63  } else if (stage == 2) {
64  outputPath->addModule(module);
65  }
66  }
67 
68  bool createAllPaths = false; //usually we might not need e.g. an output path
69  for (const ModulePtr& module : path->getModules()) {
70  if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
71  createAllPaths = true; //ensure there are all kinds of processes
72  }
73  }
74 
75  // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
76  if (mainPath->isEmpty() and not createAllPaths) {
77  mainPath.reset();
78  }
79  if (inputPath->isEmpty() and not createAllPaths) {
80  inputPath.reset();
81  }
82  if (outputPath->isEmpty() and not createAllPaths) {
83  outputPath.reset();
84  }
85 
86  return {inputPath, mainPath, outputPath};
87 }
88 
89 ModulePtr PathUtils::getHistogramManager(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
90 {
91  ModulePtr histoManagerModule;
92  for (const ModulePtr& module : inputPath->getModules()) {
93  if (module->hasProperties(Module::c_HistogramManager)) {
94  // Initialize histogram manager if found in the path
95  histoManagerModule = module;
96 
97  //add histoman to other paths
98  mainPath->addModule(histoManagerModule);
99  outputPath->addModule(histoManagerModule);
100  }
101  }
102 
103  return histoManagerModule;
104 }
105 ModulePtrList PathUtils::preparePaths(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
106 {
107  B2ASSERT("The main part is empty. This is a bug in the framework.",
108  mainPath and not mainPath->isEmpty());
109 
110  ModuleManager& moduleManager = ModuleManager::Instance();
111 
112  const auto& environment = Environment::Instance();
113 
114  const auto& socketAddress = environment.getZMQSocketAddress();
115  const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
116  const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
117  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
118  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
119 
120  unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
121  unsigned int eventBufferSize = environment.getZMQEventBufferSize();
122  unsigned int workerTimeout = environment.getZMQWorkerTimeout();
123  bool useEventBackup = environment.getZMQUseEventBackup();
124 
125  if (inputPath) {
126  // Add TXInput after input path
127  ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
128  zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
129  zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
130  zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
131  zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
132  zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
133  zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
134  appendModule(inputPath, zmqTxInputModule);
135 
136  // Add RXWorker before main path
137  ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
138  zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
139  zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
140  zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
141  zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
142  zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
143  prependModule(mainPath, zmqRxWorkerModule);
144  }
145 
146  if (outputPath) {
147  // Add TXWorker after main path
148  ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
149  zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
150  zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
151  zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
152  appendModule(mainPath, zmqTxWorkerModule);
153 
154  // Add RXOutput before output path
155  ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
156  zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
157  zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
158  zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
159  zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
160  prependModule(outputPath, zmqRxOutputModule);
161  }
162 
163  if (inputPath) {
164  B2INFO("Input Path " << inputPath->getPathString());
165  }
166  if (mainPath) {
167  B2INFO("Main Path " << mainPath->getPathString());
168  }
169  if (outputPath) {
170  B2INFO("Output Path " << outputPath->getPathString());
171  }
172 
173  Path mergedPath;
174  if (inputPath) {
175  mergedPath.addPath(inputPath);
176  }
177  mergedPath.addPath(mainPath);
178  if (outputPath) {
179  mergedPath.addPath(outputPath);
180  }
181  return mergedPath.buildModulePathList();
182 }
183 
185 {
186  ModulePtrList tmpModuleList;
187  for (const ModulePtr& m : modules) {
188  if (m->hasProperties(Module::c_TerminateInAllProcesses))
189  tmpModuleList.push_back(m);
190  }
191  return tmpModuleList;
192 }
193 
195 {
196  for (const ModulePtr& m : prependModules) {
197  if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
198  modules->push_front(m);
199  }
200  }
201 }
202 
203 void PathUtils::appendModule(PathPtr& path, const ModulePtr& module)
204 {
205  path->addModule(module);
206 }
207 
208 void PathUtils::prependModule(PathPtr& path, const ModulePtr& module)
209 {
210  PathPtr newPath(new Path());
211  newPath->addModule(module);
212  newPath->addPath(path);
213  path.swap(newPath);
214 }
Belle2::ModuleManager::registerModule
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
Definition: ModuleManager.cc:84
Belle2::c_output
@ c_output
Input socket.
Definition: ZMQAddressUtils.h:30
Belle2::c_sub
@ c_sub
Multicast publish socket.
Definition: ZMQAddressUtils.h:32
Belle2::ModuleManager::Instance
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
Definition: ModuleManager.cc:28
Belle2::ZMQAddressUtils::getSocketAddress
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
Definition: ZMQAddressUtils.cc:56
Belle2::Module::c_ParallelProcessingCertified
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:82
Belle2::Module::c_TerminateInAllProcesses
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:85
Belle2::PathUtils::prependModule
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:208
Belle2::ModuleManager::allModulesHaveFlag
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
Definition: ModuleManager.cc:144
Belle2::Module::c_HistogramManager
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:83
Belle2::PathUtils::appendModule
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:203
Belle2
Abstract base class for different kinds of events.
Definition: MillepedeAlgorithm.h:19
Belle2::ModulePtrList
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:586
Belle2::PathPtr
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:30
Belle2::ModulePtr
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:42
Belle2::Path::buildModulePathList
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:66
Belle2::Path::addPath
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:36
Belle2::PathUtils::getHistogramManager
static ModulePtr getHistogramManager(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:89
Belle2::Path
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:40
Belle2::ModuleManager
The ModuleManager Class.
Definition: ModuleManager.h:60
Belle2::PathUtils::prependModulesIfNotPresent
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:194
Belle2::Environment::Instance
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:31
Belle2::PathUtils::splitPath
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:17
Belle2::PathUtils::getTerminateGloballyModules
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:184
Belle2::c_pub
@ c_pub
Output socket.
Definition: ZMQAddressUtils.h:31
Belle2::PathUtils::preparePaths
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:105