Belle II Software development
PathUtils.cc
1/**************************************************************************
2 * basf2 (Belle II Analysis Software Framework) *
3 * Author: The Belle II Collaboration *
4 * *
5 * See git log for contributors and copyright holders. *
6 * This file is licensed under LGPL-3.0, see LICENSE.md. *
7 **************************************************************************/
8#include <framework/pcore/PathUtils.h>
9#include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10#include <framework/core/ModuleManager.h>
11#include <framework/core/Environment.h>
12#include <set>
13
14using namespace Belle2;
15
16std::tuple<PathPtr, PathPtr, PathPtr> PathUtils::splitPath(const PathPtr& path)
17{
18 //modules that can be parallelized, but should not go into a parallel section by themselves
19 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20
21 PathPtr inputPath(new Path);
22 PathPtr mainPath(new Path);
23 PathPtr outputPath(new Path);
24
25 int stage = 0; //0: in, 1: event/main, 2: out
26 for (const ModulePtr& module : path->getModules()) {
27 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified) and
28 !module->hasProperties(Module::c_Input) and
29 !module->hasProperties(Module::c_Output) and
30 !module->hasProperties(Module::c_HistogramManager) ;
31 //entire conditional path must also be compatible
32 if (hasParallelFlag and module->hasCondition()) {
33 for (const auto& conditionPath : module->getAllConditionPaths()) {
35 hasParallelFlag = false;
36 }
37 }
38 }
39 //if modules have parallal flag -> stage = 1 , event/main
40 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41 stage++;
42
43 if (stage == 2) {
44 bool path_is_useful = false;
45 for (const auto& parallelModule : mainPath->getModules()) {
46 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47 path_is_useful = true;
48 break;
49 }
50 }
51 if (not path_is_useful) {
52 //merge mainPath back into input path
53 inputPath->addPath(mainPath);
54 mainPath.reset(new Path);
55 //and search for further parallel sections
56 stage = 0;
57 }
58 }
59 }
60
61 if (stage == 0) {
62 inputPath->addModule(module);
63 if (module->hasProperties(Module::c_HistogramManager)) {
64 // Initialize histogram manager if found in the path
65 // add histogram to other paths
66 mainPath->addModule(module);
67 outputPath->addModule(module);
68 }
69 } else if (stage == 1) {
70 mainPath->addModule(module);
71 } else if (stage == 2) {
72 outputPath->addModule(module);
73 }
74 }
75
76 bool createAllPaths = false; //usually we might not need e.g. an output path
77 for (const ModulePtr& module : path->getModules()) {
78 if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
79 createAllPaths = true; //ensure there are all kinds of processes
80 }
81 }
82
83 // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
84 if (mainPath->isEmpty() and not createAllPaths) {
85 mainPath.reset();
86 }
87 if (inputPath->isEmpty() and not createAllPaths) {
88 inputPath.reset();
89 }
90 if (outputPath->isEmpty() and not createAllPaths) {
91 outputPath.reset();
92 }
93
94 return {inputPath, mainPath, outputPath};
95}
96
98{
99 ModulePtr histoManagerModule;
100
101
102 for (const ModulePtr& module : inputPath->getModules()) {
103 if (module->hasProperties(Module::c_HistogramManager)) {
104 // Initialize histogram manager if found in the path
105 histoManagerModule = module;
106
107 }
108 }
109
110 return histoManagerModule;
111}
113{
114 B2ASSERT("The main part is empty. This is a bug in the framework.",
115 mainPath and not mainPath->isEmpty());
116
117 ModuleManager& moduleManager = ModuleManager::Instance();
118
119 const auto& environment = Environment::Instance();
120
121 const auto& socketAddress = environment.getZMQSocketAddress();
122 const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
123 const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
124 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
125 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
126
127 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
128 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
129 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
130 bool useEventBackup = environment.getZMQUseEventBackup();
131
132 if (inputPath) {
133 // Add TXInput after input path
134 ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
135 zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
136 zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
137 zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
138 zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
139 zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
140 zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
141 appendModule(inputPath, zmqTxInputModule);
142
143 // Add RXWorker before main path
144 ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
145 zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
146 zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
147 zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
148 zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
149 zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
150 prependModule(mainPath, zmqRxWorkerModule);
151
152 }
153
154 if (outputPath) {
155 // Add TXWorker after main path
156 ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
157 zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
158 zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
159 zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
160 appendModule(mainPath, zmqTxWorkerModule);
161
162 // Add RXOutput before output path
163 ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
164 zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
165 zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
166 zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
167 zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
168 prependModule(outputPath, zmqRxOutputModule);
169
170 }
171
172 if (inputPath) {
173 B2INFO("Input Path " << inputPath->getPathString());
174 }
175 if (mainPath) {
176 B2INFO("Main Path " << mainPath->getPathString());
177 }
178 if (outputPath) {
179 B2INFO("Output Path " << outputPath->getPathString());
180 }
181
182 Path mergedPath;
183 if (inputPath) {
184 mergedPath.addPath(inputPath);
185 }
186 mergedPath.addPath(mainPath);
187 if (outputPath) {
188 mergedPath.addPath(outputPath);
189 }
190
191 B2INFO("ModuleList " << mergedPath.getPathString());
192
193 return mergedPath.buildModulePathList();
194}
195
197{
198 ModulePtrList tmpModuleList;
199 for (const ModulePtr& m : modules) {
200 if (m->hasProperties(Module::c_TerminateInAllProcesses))
201 tmpModuleList.push_back(m);
202 }
203 return tmpModuleList;
204}
205
207{
208 for (const ModulePtr& m : prependModules) {
209 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
210 modules->push_front(m);
211 }
212 }
213}
214
215void PathUtils::appendModule(PathPtr& path, const ModulePtr& module)
216{
217 path->addModule(module);
218}
219
221{
222 PathPtr newPath(new Path());
223 newPath->addModule(module);
224 newPath->addPath(path);
225 path.swap(newPath);
226}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
The ModuleManager Class.
Definition: ModuleManager.h:53
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
Definition: PathUtils.cc:196
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
Definition: PathUtils.cc:112
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
Definition: PathUtils.cc:16
static ModulePtr getHistogramManager(PathPtr &inputPath)
Find the histogram manager in the paths and return it.
Definition: PathUtils.cc:97
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
Definition: PathUtils.cc:206
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:220
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:215
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
Definition: Path.cc:206
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.
Abstract base class for different kinds of events.