8 #include <framework/pcore/PathUtils.h>
9 #include <framework/pcore/zmq/utils/ZMQAddressUtils.h>
10 #include <framework/core/ModuleManager.h>
11 #include <framework/core/Environment.h>
19 std::set<std::string> uselessParallelModules({
"HistoManager",
"Gearbox",
"Geometry"});
26 for (
const ModulePtr& module : path->getModules()) {
32 if (hasParallelFlag and module->hasCondition()) {
33 for (
const auto& conditionPath : module->getAllConditionPaths()) {
35 hasParallelFlag =
false;
40 if ((stage == 0 and hasParallelFlag) or (stage == 1 and !hasParallelFlag)) {
44 bool path_is_useful =
false;
45 for (
const auto& parallelModule : mainPath->getModules()) {
46 if (uselessParallelModules.count(parallelModule->getType()) == 0) {
47 path_is_useful =
true;
51 if (not path_is_useful) {
53 inputPath->addPath(mainPath);
54 mainPath.reset(
new Path);
62 inputPath->addModule(module);
67 mainPath->addModule(module);
68 outputPath->addModule(module);
70 }
else if (stage == 1) {
71 mainPath->addModule(module);
72 }
else if (stage == 2) {
73 outputPath->addModule(module);
77 bool createAllPaths =
false;
78 for (
const ModulePtr& module : path->getModules()) {
80 createAllPaths =
true;
85 if (mainPath->isEmpty() and not createAllPaths) {
88 if (inputPath->isEmpty() and not createAllPaths) {
91 if (outputPath->isEmpty() and not createAllPaths) {
95 return {inputPath, mainPath, outputPath};
101 for (
const ModulePtr& module : inputPath->getModules()) {
104 histoManagerModule = module;
112 return histoManagerModule;
116 B2ASSERT(
"The main part is empty. This is a bug in the framework.",
117 mainPath and not mainPath->isEmpty());
123 const auto& socketAddress = environment.getZMQSocketAddress();
129 unsigned int maximalWaitingTime = environment.getZMQMaximalWaitingTime();
130 unsigned int eventBufferSize = environment.getZMQEventBufferSize();
131 unsigned int workerTimeout = environment.getZMQWorkerTimeout();
132 bool useEventBackup = environment.getZMQUseEventBackup();
137 zmqTxInputModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
138 zmqTxInputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
139 zmqTxInputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
140 zmqTxInputModule->getParam<
unsigned int>(
"workerProcessTimeout").setValue(workerTimeout);
141 zmqTxInputModule->getParam<
bool>(
"useEventBackup").setValue(useEventBackup);
142 zmqTxInputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
147 zmqRxWorkerModule->getParam<std::string>(
"socketName").setValue(inputSocketAddress);
148 zmqRxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
149 zmqRxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
150 zmqRxWorkerModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
151 zmqRxWorkerModule->getParam<
unsigned int>(
"eventBufferSize").setValue(eventBufferSize);
159 zmqTxWorkerModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
160 zmqTxWorkerModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
161 zmqTxWorkerModule->getParam<std::string>(
"xsubProxySocketName").setValue(pubSocketAddress);
166 zmqRxOutputModule->getParam<std::string>(
"socketName").setValue(outputSocketAddress);
167 zmqRxOutputModule->getParam<std::string>(
"xpubProxySocketName").setValue(pubSocketAddress);
168 zmqRxOutputModule->getParam<std::string>(
"xsubProxySocketName").setValue(subSocketAddress);
169 zmqRxOutputModule->getParam<
unsigned int>(
"maximalWaitingTime").setValue(maximalWaitingTime);
175 B2INFO(
"Input Path " << inputPath->getPathString());
178 B2INFO(
"Main Path " << mainPath->getPathString());
181 B2INFO(
"Output Path " << outputPath->getPathString());
190 mergedPath.
addPath(outputPath);
203 tmpModuleList.push_back(m);
205 return tmpModuleList;
210 for (
const ModulePtr& m : prependModules) {
211 if (std::find(modules->begin(), modules->end(), m) == modules->end()) {
212 modules->push_front(m);
219 path->addModule(module);
225 newPath->addModule(module);
226 newPath->addPath(path);
static Environment & Instance()
Static method to get a reference to the Environment instance.
std::shared_ptr< Module > registerModule(const std::string &moduleName, std::string sharedLibPath="") noexcept(false)
Creates an instance of a module and registers it to the ModuleManager.
static bool allModulesHaveFlag(const std::list< std::shared_ptr< Module >> &list, unsigned int flag)
Returns true if and only if all modules in list have the given flag (or list is empty).
static ModuleManager & Instance()
Exception is thrown if the requested module could not be created by the ModuleManager.
@ c_HistogramManager
This module is used to manage histograms accumulated by other modules.
@ c_Input
This module is an input module (reads data).
@ c_ParallelProcessingCertified
This module can be run in parallel processing mode safely (All I/O must be done through the data stor...
@ c_Output
This module is an output module (writes data).
@ c_TerminateInAllProcesses
When using parallel processing, call this module's terminate() function in all processes().
static ModulePtrList getTerminateGloballyModules(const ModulePtrList &modules)
Return only modules which have the TerminateGlobally Module flag set.
static ModulePtrList preparePaths(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Adds internal zmq modules to the paths.
static std::tuple< PathPtr, PathPtr, PathPtr > splitPath(const PathPtr &path)
Split the given part into the input, main and output path (in this order) by looking onto the paralle...
static ModulePtr getHistogramManager(PathPtr &inputPath, PathPtr &mainPath, PathPtr &outputPath)
Find the histogram manager in the paths and return it.
static void prependModulesIfNotPresent(ModulePtrList *modules, const ModulePtrList &prependModules)
Prepend given 'prependModules' to 'modules', if they're not already present.
static void prependModule(PathPtr &path, const ModulePtr &module)
Append given modules to the path.
static void appendModule(PathPtr &path, const ModulePtr &module)
Prepend given modules to the path.
Implements a path consisting of Module and/or Path objects.
std::list< std::shared_ptr< Module > > buildModulePathList(bool unique=true) const
Builds a list of all modules which could be executed during the data processing.
void addPath(const PathPtr &path)
See 'pydoc3 basf2.Path'.
std::string getPathString() const override
return a string of the form [module a -> module b -> [another path]]
static std::string getSocketAddress(const std::string &socketAddress, ZMQAddressType socketPart)
Create a full socket address for the given type from a random socket address, ba adding a suffix.
std::shared_ptr< Path > PathPtr
Defines a pointer to a path object as a boost shared pointer.
std::shared_ptr< Module > ModulePtr
Defines a pointer to a module object as a boost shared pointer.
std::list< ModulePtr > ModulePtrList
Defines a std::list of shared module pointers.
@ c_sub
Multicast publish socket.
Abstract base class for different kinds of events.