Belle II Software  release-06-00-14
PathUtils.cc
1 /**************************************************************************
2  * basf2 (Belle II Analysis Software Framework) *
3  * Author: The Belle II Collaboration *
4  * *
5  * See git log for contributors and copyright holders. *
6  * This file is licensed under LGPL-3.0, see LICENSE.md. *
7  **************************************************************************/
8 #include <framework/pcore/PathUtils.h>
9 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10 #include <framework/core/ModuleManager.h>
11 #include <framework/core/Environment.h>
12 #include <set>
13 
14 using namespace Belle2;
15 
16 std::tuple<PathPtr, PathPtr, PathPtr> PathUtils::splitPath(const PathPtr& path)
17 {
18  //modules that can be parallelized, but should not go into a parallel section by themselves
19  std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20 
21  PathPtr inputPath(new Path);
22  PathPtr mainPath(new Path);
23  PathPtr outputPath(new Path);
24 
25  int stage = 0; //0: in, 1: event/main, 2: out
26  for (const ModulePtr& module : path->getModules()) {
27  bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified);
28  //entire conditional path must also be compatible
29  if (hasParallelFlag and module->hasCondition()) {
30  for (const auto& conditionPath : module->getAllConditionPaths()) {
32  hasParallelFlag = false;
33  }
34  }
35  }
36  //if modules have parallal flag -> stage = 1 , event/main
37  if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
38  stage++;
39 
40  if (stage == 2) {
41  bool path_is_useful = false;
42  for (const auto& parallelModule : mainPath->getModules()) {
43  if (uselessParallelModules.count(parallelModule->getType()) == 0) {
44  path_is_useful = true;
45  break;
46  }
47  }
48  if (not path_is_useful) {
49  //merge mainPath back into input path
50  inputPath->addPath(mainPath);
51  mainPath.reset(new Path);
52  //and search for further parallel sections
53  stage = 0;
54  }
55  }
56  }
57 
58  if (stage == 0) {
59  inputPath->addModule(module);
60  } else if (stage == 1) {
61  mainPath->addModule(module);
62  } else if (stage == 2) {
63  outputPath->addModule(module);
64  }
65  }
66 
67  bool createAllPaths = false; //usually we might not need e.g. an output path
68  for (const ModulePtr& module : path->getModules()) {
69  if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
70  createAllPaths = true; //ensure there are all kinds of processes
71  }
72  }
73 
74  // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
75  if (mainPath->isEmpty() and not createAllPaths) {
76  mainPath.reset();
77  }
78  if (inputPath->isEmpty() and not createAllPaths) {
79  inputPath.reset();
80  }
81  if (outputPath->isEmpty() and not createAllPaths) {
82  outputPath.reset();
83  }
84 
85  return {inputPath, mainPath, outputPath};
86 }
87 
88 ModulePtr PathUtils::getHistogramManager(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
89 {
90  ModulePtr histoManagerModule;
91  for (const ModulePtr& module : inputPath->getModules()) {
92  if (module->hasProperties(Module::c_HistogramManager)) {
93  // Initialize histogram manager if found in the path
94  histoManagerModule = module;
95 
96  //add histoman to other paths
97  mainPath->addModule(histoManagerModule);
98  outputPath->addModule(histoManagerModule);
99  }
100  }
101 
102  return histoManagerModule;
103 }
104 ModulePtrList PathUtils::preparePaths(PathPtr& inputPath, PathPtr& mainPath, PathPtr& outputPath)
105 {
106  B2ASSERT("The main part is empty. This is a bug in the framework.",
107  mainPath and not mainPath->isEmpty());
108 
109  ModuleManager& moduleManager = ModuleManager::Instance();
110 
111  const auto& environment = Environment::Instance();
112 
113  const auto& socketAddress = environment.getZMQSocketAddress();
114  const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
115  const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
116  const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
117  const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
118 
119  unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
120  unsigned int eventBufferSize = environment.getZMQEventBufferSize();
121  unsigned int workerTimeout = environment.getZMQWorkerTimeout();
122  bool useEventBackup = environment.getZMQUseEventBackup();
123 
124  if (inputPath) {
125  // Add TXInput after input path
126  ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
127  zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
128  zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
129  zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
130  zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
131  zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
132  zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
133  appendModule(inputPath, zmqTxInputModule);
134 
135  // Add RXWorker before main path
136  ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
137  zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
138  zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
139  zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
140  zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
141  zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
142  prependModule(mainPath, zmqRxWorkerModule);
143  }
144 
145  if (outputPath) {
146  // Add TXWorker after main path
147  ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
148  zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
149  zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
150  zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
151  appendModule(mainPath, zmqTxWorkerModule);
152 
153  // Add RXOutput before output path
154  ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
155  zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
156  zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
157  zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
158  zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
159  prependModule(outputPath, zmqRxOutputModule);
160  }
161 
162  if (inputPath) {
163  B2INFO("Input Path " << inputPath->getPathString());
164  }
165  if (mainPath) {
166  B2INFO("Main Path " << mainPath->getPathString());
167  }
168  if (outputPath) {
169  B2INFO("Output Path " << outputPath->getPathString());
170  }
171 
172  Path mergedPath;
173  if (inputPath) {
174  mergedPath.addPath(inputPath);
175  }
176  mergedPath.addPath(mainPath);
177  if (outputPath) {
178  mergedPath.addPath(outputPath);
179  }
180  return mergedPath.buildModulePathList();
181 }
182 
184 {
185  ModulePtrList tmpModuleList;
186  for (const ModulePtr& m : modules) {
187  if (m->hasProperties(Module::c_TerminateInAllProcesses))
188  tmpModuleList.push_back(m);
189  }
190  return tmpModuleList;
191 }
192 
194 {
195  for (const ModulePtr& m : prependModules) {
196  if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
197  modules->push_front(m);
198  }
199  }
200 }
201 
202 void PathUtils::appendModule(PathPtr& path, const ModulePtr& module)
203 {
204  path->addModule(module);
205 }
206 
207 void PathUtils::prependModule(PathPtr& path, const ModulePtr& module)
208 {
209  PathPtr newPath(new Path());
210  newPath->addModule(module);
211  newPath->addPath(path);
212  path.swap(newPath);
213 }
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:29
The ModuleManager Class.
Definition: ModuleManager.h:58
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:183
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:104
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:88
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:193
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:207
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:202
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:64
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:34
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:28
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:40
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:584
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.