Belle II Software development
PathUtils Class Reference

Helper utils for path arithmetic needed in the pEventProcessor. More...

#include <PathUtils.h>

Static Public Member Functions

static std::tuple< PathPtr, PathPtr, PathPtrsplitPath (const PathPtr &path)
 Split the given part into the input, main and output path (in this order) by looking onto the parallel certificate of the modules.
 
static ModulePtrList preparePaths (PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
 Adds internal zmq modules to the paths.
 
static ModulePtr getHistogramManager (PathPtr &inputPath)
 Find the histogram manager in the paths and return it.
 
static ModulePtrList getTerminateGloballyModules (const ModulePtrList &modules)
 Return only modules which have the TerminateGlobally Module flag set.
 
static void prependModulesIfNotPresent (ModulePtrList *modules, const ModulePtrList &prependModules)
 Prepend given 'prependModules' to 'modules', if they're not already present.
 
static void appendModule (PathPtr &path, const ModulePtr &module)
 Prepend given modules to the path.
 
static void prependModule (PathPtr &path, const ModulePtr &module)
 Append given modules to the path.
 

Detailed Description

Helper utils for path arithmetic needed in the pEventProcessor.

Definition at line 19 of file PathUtils.h.

Member Function Documentation

◆ appendModule()

void appendModule ( PathPtr path,
const ModulePtr module 
)
static

Prepend given modules to the path.

Definition at line 215 of file PathUtils.cc.

216{
217 path->addModule(module);
218}

◆ getHistogramManager()

ModulePtr getHistogramManager ( PathPtr inputPath)
static

Find the histogram manager in the paths and return it.

Definition at line 97 of file PathUtils.cc.

98{
99 ModulePtr histoManagerModule;
100
101
102 for (const ModulePtr& module : inputPath->getModules()) {
103 if (module->hasProperties(Module::c_HistogramManager)) {
104 // Initialize histogram manager if found in the path
105 histoManagerModule = module;
106
107 }
108 }
109
110 return histoManagerModule;
111}
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
Definition: Module.h:81
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
Definition: Module.h:43

◆ getTerminateGloballyModules()

ModulePtrList getTerminateGloballyModules ( const ModulePtrList modules)
static

Return only modules which have the TerminateGlobally Module flag set.

Definition at line 196 of file PathUtils.cc.

197{
198 ModulePtrList tmpModuleList;
199 for (const ModulePtr& m : modules) {
200 if (m->hasProperties(Module::c_TerminateInAllProcesses))
201 tmpModuleList.push_back(m);
202 }
203 return tmpModuleList;
204}
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
Definition: Module.h:83
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
Definition: Module.h:583

◆ preparePaths()

ModulePtrList preparePaths ( PathPtr inputPath,
PathPtr mainPath,
PathPtr outputPath 
)
static

Adds internal zmq modules to the paths.

Definition at line 112 of file PathUtils.cc.

113{
114 B2ASSERT("The main part is empty. This is a bug in the framework.",
115 mainPath and not mainPath->isEmpty());
116
117 ModuleManager& moduleManager = ModuleManager::Instance();
118
119 const auto& environment = Environment::Instance();
120
121 const auto& socketAddress = environment.getZMQSocketAddress();
122 const auto inputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_input));
123 const auto outputSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_output));
124 const auto pubSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_pub));
125 const auto subSocketAddress(ZMQAddressUtils::getSocketAddress(socketAddress, ZMQAddressType::c_sub));
126
127 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
128 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
129 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
130 bool useEventBackup = environment.getZMQUseEventBackup();
131
132 if (inputPath) {
133 // Add TXInput after input path
134 ModulePtr zmqTxInputModule = moduleManager.registerModule("ZMQTxInput");
135 zmqTxInputModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
136 zmqTxInputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
137 zmqTxInputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
138 zmqTxInputModule->getParam<unsigned int>("workerProcessTimeout").setValue(workerTimeout);
139 zmqTxInputModule->getParam<bool>("useEventBackup").setValue(useEventBackup);
140 zmqTxInputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
141 appendModule(inputPath, zmqTxInputModule);
142
143 // Add RXWorker before main path
144 ModulePtr zmqRxWorkerModule = moduleManager.registerModule("ZMQRxWorker");
145 zmqRxWorkerModule->getParam<std::string>("socketName").setValue(inputSocketAddress);
146 zmqRxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
147 zmqRxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
148 zmqRxWorkerModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
149 zmqRxWorkerModule->getParam<unsigned int>("eventBufferSize").setValue(eventBufferSize);
150 prependModule(mainPath, zmqRxWorkerModule);
151
152 }
153
154 if (outputPath) {
155 // Add TXWorker after main path
156 ModulePtr zmqTxWorkerModule = moduleManager.registerModule("ZMQTxWorker");
157 zmqTxWorkerModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
158 zmqTxWorkerModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
159 zmqTxWorkerModule->getParam<std::string>("xsubProxySocketName").setValue(pubSocketAddress);
160 appendModule(mainPath, zmqTxWorkerModule);
161
162 // Add RXOutput before output path
163 ModulePtr zmqRxOutputModule = moduleManager.registerModule("ZMQRxOutput");
164 zmqRxOutputModule->getParam<std::string>("socketName").setValue(outputSocketAddress);
165 zmqRxOutputModule->getParam<std::string>("xpubProxySocketName").setValue(pubSocketAddress);
166 zmqRxOutputModule->getParam<std::string>("xsubProxySocketName").setValue(subSocketAddress);
167 zmqRxOutputModule->getParam<unsigned int>("maximalWaitingTime").setValue(maximalWaitingTime);
168 prependModule(outputPath, zmqRxOutputModule);
169
170 }
171
172 if (inputPath) {
173 B2INFO("Input Path " << inputPath->getPathString());
174 }
175 if (mainPath) {
176 B2INFO("Main Path " << mainPath->getPathString());
177 }
178 if (outputPath) {
179 B2INFO("Output Path " << outputPath->getPathString());
180 }
181
182 Path mergedPath;
183 if (inputPath) {
184 mergedPath.addPath(inputPath);
185 }
186 mergedPath.addPath(mainPath);
187 if (outputPath) {
188 mergedPath.addPath(outputPath);
189 }
190
191 B2INFO("ModuleList " << mergedPath.getPathString());
192
193 return mergedPath.buildModulePathList();
194}
static Environment & Instance()
Static method to get a reference to the Environment instance.
Definition: Environment.cc:28
The ModuleManager Class.
Definition: ModuleManager.h:53
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
Definition: PathUtils.cc:220
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Definition: PathUtils.cc:215
Implements a path consisting of Module and/or Path objects.
Definition: Path.h:38
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
Definition: Path.cc:67
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
Definition: Path.cc:37
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
Definition: Path.cc:206
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
@ c_sub
Multicast publish socket.
@ c_pub
Output socket.
@ c_output
Input socket.

◆ prependModule()

void prependModule ( PathPtr path,
const ModulePtr module 
)
static

Append given modules to the path.

Definition at line 220 of file PathUtils.cc.

221{
222 PathPtr newPath(new Path());
223 newPath->addModule(module);
224 newPath->addPath(path);
225 path.swap(newPath);
226}
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
Definition: Path.h:35

◆ prependModulesIfNotPresent()

void prependModulesIfNotPresent ( ModulePtrList modules,
const ModulePtrList prependModules 
)
static

Prepend given 'prependModules' to 'modules', if they're not already present.

Definition at line 206 of file PathUtils.cc.

207{
208 for (const ModulePtr& m : prependModules) {
209 if (std::find(modules->begin(), modules->end(), m) == modules->end()) { //not present
210 modules->push_front(m);
211 }
212 }
213}

◆ splitPath()

std::tuple< PathPtr, PathPtr, PathPtr > splitPath ( const PathPtr path)
static

Split the given part into the input, main and output path (in this order) by looking onto the parallel certificate of the modules.

Definition at line 16 of file PathUtils.cc.

17{
18 //modules that can be parallelized, but should not go into a parallel section by themselves
19 std::set<std::string> uselessParallelModules({"HistoManager", "Gearbox", "Geometry"});
20
21 PathPtr inputPath(new Path);
22 PathPtr mainPath(new Path);
23 PathPtr outputPath(new Path);
24
25 int stage = 0; //0: in, 1: event/main, 2: out
26 for (const ModulePtr& module : path->getModules()) {
27 bool hasParallelFlag = module->hasProperties(Module::c_ParallelProcessingCertified) and
28 !module->hasProperties(Module::c_Input) and
29 !module->hasProperties(Module::c_Output) and
30 !module->hasProperties(Module::c_HistogramManager) ;
31 //entire conditional path must also be compatible
32 if (hasParallelFlag and module->hasCondition()) {
33 for (const auto& conditionPath : module->getAllConditionPaths()) {
35 hasParallelFlag = false;
36 }
37 }
38 }
39 //if modules have parallal flag -> stage = 1 , event/main
40 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
41 stage++;
42
43 if (stage == 2) {
44 bool path_is_useful = false;
45 for (const auto& parallelModule : mainPath->getModules()) {
46 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47 path_is_useful = true;
48 break;
49 }
50 }
51 if (not path_is_useful) {
52 //merge mainPath back into input path
53 inputPath->addPath(mainPath);
54 mainPath.reset(new Path);
55 //and search for further parallel sections
56 stage = 0;
57 }
58 }
59 }
60
61 if (stage == 0) {
62 inputPath->addModule(module);
63 if (module->hasProperties(Module::c_HistogramManager)) {
64 // Initialize histogram manager if found in the path
65 // add histogram to other paths
66 mainPath->addModule(module);
67 outputPath->addModule(module);
68 }
69 } else if (stage == 1) {
70 mainPath->addModule(module);
71 } else if (stage == 2) {
72 outputPath->addModule(module);
73 }
74 }
75
76 bool createAllPaths = false; //usually we might not need e.g. an output path
77 for (const ModulePtr& module : path->getModules()) {
78 if (module->hasProperties(Module::c_TerminateInAllProcesses)) {
79 createAllPaths = true; //ensure there are all kinds of processes
80 }
81 }
82
83 // if main path is empty, createAllPaths doesn't really matter, since we'll fall back to single-core processing
84 if (mainPath->isEmpty() and not createAllPaths) {
85 mainPath.reset();
86 }
87 if (inputPath->isEmpty() and not createAllPaths) {
88 inputPath.reset();
89 }
90 if (outputPath->isEmpty() and not createAllPaths) {
91 outputPath.reset();
92 }
93
94 return {inputPath, mainPath, outputPath};
95}
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module > > &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
@ c_Input
This module is an input module (reads data).
Definition: Module.h:78
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
Definition: Module.h:80
@ c_Output
This module is an output module (writes data).
Definition: Module.h:79

The documentation for this class was generated from the following files: