Belle II Software development
Collection Class Reference

Public Member Functions

 __init__ (self, collector=None, input_files=None, pre_collector_path=None, database_chain=None, output_patterns=None, max_files_per_collector_job=None, max_collector_jobs=None, backend_args=None)
 
 reset_database (self)
 
 use_central_database (self, global_tag)
 
 use_local_database (self, filename, directory="")
 
 input_files (self)
 
 input_files (self, value)
 
 collector (self)
 
 collector (self, collector)
 
 is_valid (self)
 
 max_collector_jobs (self)
 
 max_collector_jobs (self, value)
 
 max_files_per_collector_job (self)
 
 max_files_per_collector_job (self, value)
 

Static Public Member Functions

 uri_list_from_input_file (input_file)
 

Public Attributes

 collector = collector
 Collector module of this collection.
 
list input_files = []
 Internal input_files stored for this calibration.
 
dict files_to_iovs = {}
 File -> Iov dictionary, should be :
 
 pre_collector_path = None
 Since many collectors require some different setup, if you set this attribute to a basf2.Path it will be run before the collector and after the default RootInput module + HistoManager setup.
 
list output_patterns = ["CollectorOutput.root"]
 Output patterns of files produced by collector which will be used to pass to the Algorithm.data_input function.
 
 splitter = None
 The SubjobSplitter to use when constructing collector subjobs from the overall Job object.
 
dict backend_args = {}
 Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance should treat the collector job when submitting.
 
list database_chain = database_chain
 The database chain used for this Collection.
 
 job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
 job script
 
list job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
 The Collector caf.backends.Job.cmd attribute.
 

Static Public Attributes

int default_max_collector_jobs = 1000
 The default maximum number of collector jobs to create.
 
str job_config = "collector_job.json"
 The name of the file containing the collector Job's dictionary.
 

Protected Attributes

 _input_files = self.uri_list_from_input_file(value)
 set input files
 
 _collector = collector
 Internal storage of collector attribute.
 

Detailed Description

Keyword Arguments:
    collector (str, basf2.Module): The collector module  or module name for this `Collection`.
    input_files (list[str]): The input files to be used for only this `Collection`.
    pre_collection_path (basf2.Path): The reconstruction `basf2.Path` to be run prior to the Collector module.
    database_chain (list[CentralDatabase, LocalDatabase]): The database chain to be used initially for this `Collection`.
    output_patterns (list[str]): Output patterns of files produced by collector which will be used to pass to the
        `Algorithm.data_input` function. Setting this here, replaces the default completely.
    max_files_for_collector_job (int): Maximum number of input files sent to each collector subjob for this `Collection`.
        Technically this sets the SubjobSplitter to be used, not compatible with max_collector_jobs.
    max_collector_jobs (int): Maximum number of collector subjobs for this `Collection`.
        Input files are split evenly between them. Technically this sets the SubjobSplitter to be used. Not compatible with
        max_files_for_collector_job.
    backend_args (dict): The args for the backend submission of this `Collection`.

Definition at line 51 of file framework.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
collector = None,
input_files = None,
pre_collector_path = None,
database_chain = None,
output_patterns = None,
max_files_per_collector_job = None,
max_collector_jobs = None,
backend_args = None )
 

Definition at line 74 of file framework.py.

83 ):
84 """
85 """
86
87 self.collector = collector
88
89 self.input_files = []
90 if input_files:
91 self.input_files = input_files
92
98 self.files_to_iovs = {}
99
103 self.pre_collector_path = None
104 if pre_collector_path:
105 self.pre_collector_path = pre_collector_path
106
110 self.output_patterns = ["CollectorOutput.root"]
111 if output_patterns:
112 self.output_patterns = output_patterns
113
114
116 self.splitter = None
117 if max_files_per_collector_job and max_collector_jobs:
118 B2FATAL("Cannot set both 'max_files_per_collector_job' and 'max_collector_jobs' of a collection!")
119 # \cond false positive doxygen warning
120 elif max_files_per_collector_job:
121 self.max_files_per_collector_job = max_files_per_collector_job
122 elif max_collector_jobs:
123 self.max_collector_jobs = max_collector_jobs
124 else:
125 self.max_collector_jobs = self.default_max_collector_jobs
126 # \endcond
127
128
130 self.backend_args = {}
131 if backend_args:
132 self.backend_args = backend_args
133
134 if database_chain:
135
138 self.database_chain = database_chain
139 else:
140 self.database_chain = []
141 # This may seem weird but the changes to the DB interface mean that they have effectively swapped from being
142 # described well by appending to a list to a deque. So we do bit of reversal to translate it back and make the
143 # most important GT the last one encountered.
144 for tag in reversed(b2conditions.default_globaltags):
145 self.use_central_database(tag)
146
147
148 self.job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()
149 """The basf2 steering file that will be used for Collector jobs run by this collection.
150This script will be copied into subjob directories as part of the input sandbox."""
151
152
153 self.job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]
154

Member Function Documentation

◆ collector() [1/2]

collector ( self)
 

Definition at line 251 of file framework.py.

251 def collector(self):
252 """
253 """
254 return self._collector
255

◆ collector() [2/2]

collector ( self,
collector )
 

Definition at line 257 of file framework.py.

257 def collector(self, collector):
258 """
259 """
260 # check if collector is already a module or if we need to create one
261 # from the name
262 if collector:
263 from basf2 import Module
264 if isinstance(collector, str):
265 from basf2 import register_module
266 collector = register_module(collector)
267 if not isinstance(collector, Module):
268 B2ERROR("Collector needs to be either a Module or the name of such a module")
269
270 self._collector = collector
271

◆ input_files() [1/2]

input_files ( self)
 

Definition at line 228 of file framework.py.

228 def input_files(self):
229 """
230 """
231 return self._input_files
232

◆ input_files() [2/2]

input_files ( self,
value )
 

Definition at line 234 of file framework.py.

234 def input_files(self, value):
235 """
236 """
237 if isinstance(value, str):
238 # If it's a string, we convert to a list of URIs
239
240 self._input_files = self.uri_list_from_input_file(value)
241 elif isinstance(value, list):
242 # If it's a list we loop and do the same thing
243 total_files = []
244 for pattern in value:
245 total_files.extend(self.uri_list_from_input_file(pattern))
246 self._input_files = total_files
247 else:
248 raise TypeError("Input files must be a list or string")
249

◆ is_valid()

is_valid ( self)
 

Definition at line 272 of file framework.py.

272 def is_valid(self):
273 """
274 """
275 if (not self.collector or not self.input_files):
276 return False
277 else:
278 return True
279

◆ max_collector_jobs() [1/2]

max_collector_jobs ( self)
 

Definition at line 281 of file framework.py.

281 def max_collector_jobs(self):
282 """
283 """
284 if self.splitter:
285 return self.splitter.max_subjobs
286 else:
287 return None
288

◆ max_collector_jobs() [2/2]

max_collector_jobs ( self,
value )
 

Definition at line 290 of file framework.py.

290 def max_collector_jobs(self, value):
291 """
292 """
293 if value is None:
294 self.splitter = None
295 else:
296 self.splitter = MaxSubjobsSplitter(max_subjobs=value)
297

◆ max_files_per_collector_job() [1/2]

max_files_per_collector_job ( self)
 

Definition at line 299 of file framework.py.

299 def max_files_per_collector_job(self):
300 """
301 """
302 if self.splitter:
303 return self.splitter.max_files_per_subjob
304 else:
305 return None
306

◆ max_files_per_collector_job() [2/2]

max_files_per_collector_job ( self,
value )
 

Definition at line 308 of file framework.py.

308 def max_files_per_collector_job(self, value):
309 """
310 """
311 if value is None:
312 self.splitter = None
313 else:
314 self.splitter = MaxFilesSplitter(max_files_per_subjob=value)
315
316

◆ reset_database()

reset_database ( self)
Remove everything in the database_chain of this Calibration, including the default central database
tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.

Definition at line 155 of file framework.py.

155 def reset_database(self):
156 """
157 Remove everything in the database_chain of this Calibration, including the default central database
158 tag automatically included from `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
159 """
160 self.database_chain = []
161

◆ uri_list_from_input_file()

uri_list_from_input_file ( input_file)
static
Parameters:
    input_file (str): A local file/glob pattern or XROOTD URI

Returns:
    list: A list of the URIs found from the initial string.

Definition at line 208 of file framework.py.

208 def uri_list_from_input_file(input_file):
209 """
210 Parameters:
211 input_file (str): A local file/glob pattern or XROOTD URI
212
213 Returns:
214 list: A list of the URIs found from the initial string.
215 """
216 # By default we assume it is a local file path if no "scheme" is given
217 uri = parse_file_uri(input_file)
218 if uri.scheme == "file":
219 # For local files we also perform a glob just in case it is a wildcard pattern.
220 # That way we will have all the uris of files separately
221 uris = [parse_file_uri(f).geturl() for f in glob(input_file)]
222 else:
223 # Just let everything else through and hop the backend can figure it out
224 uris = [input_file]
225 return uris
226

◆ use_central_database()

use_central_database ( self,
global_tag )
Parameters:
    global_tag (str): The central database global tag to use for this calibration.

Using this allows you to add a central database to the head of the global tag database chain for this collection.
The default database chain is just the central one from
`basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
The input file global tag will always be overridden and never used unless explicitly set.

To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.

Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.

NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
list which will be checked after all local database files have been checked.

So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.

Definition at line 162 of file framework.py.

162 def use_central_database(self, global_tag):
163 """
164 Parameters:
165 global_tag (str): The central database global tag to use for this calibration.
166
167 Using this allows you to add a central database to the head of the global tag database chain for this collection.
168 The default database chain is just the central one from
169 `basf2.conditions.default_globaltags <ConditionsConfiguration.default_globaltags>`.
170 The input file global tag will always be overridden and never used unless explicitly set.
171
172 To turn off central database completely or use a custom tag as the base, you should call `Calibration.reset_database`
173 and start adding databases with `Calibration.use_local_database` and `Calibration.use_central_database`.
174
175 Alternatively you could set an empty list as the input database_chain when adding the Collection to the Calibration.
176
177 NOTE!! Since ``release-04-00-00`` the behaviour of basf2 conditions databases has changed.
178 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
179 list which will be checked after all local database files have been checked.
180
181 So even if you ask for ``["global_tag1", "localdb/database.txt", "global_tag2"]`` to be the database chain, the real order
182 that basf2 will use them is ``["global_tag1", "global_tag2", "localdb/database.txt"]`` where the file is checked first.
183 """
184 central_db = CentralDatabase(global_tag)
185 self.database_chain.append(central_db)
186

◆ use_local_database()

use_local_database ( self,
filename,
directory = "" )
Parameters:
    filename (str): The path to the database.txt of the local database
    directory (str): The path to the payloads directory for this local database.

Append a local database to the chain for this collection.
You can call this function multiple times and each database will be added to the chain IN ORDER.
The databases are applied to this collection ONLY.

NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
list which will be checked after all local database files have been checked.

So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.

Definition at line 187 of file framework.py.

187 def use_local_database(self, filename, directory=""):
188 """
189 Parameters:
190 filename (str): The path to the database.txt of the local database
191 directory (str): The path to the payloads directory for this local database.
192
193 Append a local database to the chain for this collection.
194 You can call this function multiple times and each database will be added to the chain IN ORDER.
195 The databases are applied to this collection ONLY.
196
197 NOTE!! Since release-04-00-00 the behaviour of basf2 conditions databases has changed.
198 All local database files MUST now be at the head of the 'chain', with all central database global tags in their own
199 list which will be checked after all local database files have been checked.
200
201 So even if you ask for ["global_tag1", "localdb/database.txt", "global_tag2"] to be the database chain, the real order
202 that basf2 will use them is ["global_tag1", "global_tag2", "localdb/database.txt"] where the file is checked first.
203 """
204 local_db = LocalDatabase(filename, directory)
205 self.database_chain.append(local_db)
206

Member Data Documentation

◆ _collector

_collector = collector
protected

Internal storage of collector attribute.

Definition at line 270 of file framework.py.

◆ _input_files

_input_files = self.uri_list_from_input_file(value)
protected

set input files

Definition at line 240 of file framework.py.

◆ backend_args

dict backend_args = {}

Dictionary passed to the collector Job object to configure how the caf.backends.Backend instance should treat the collector job when submitting.

The choice of arguments here depends on which backend you plan on using.

Definition at line 130 of file framework.py.

◆ collector

collector = collector

Collector module of this collection.

Definition at line 87 of file framework.py.

◆ database_chain

list database_chain = database_chain

The database chain used for this Collection.

NOT necessarily the same database chain used for the algorithm step! Since the algorithm will merge the collected data into one process it has to use a single DB chain set from the overall Calibration.

Definition at line 138 of file framework.py.

◆ default_max_collector_jobs

int default_max_collector_jobs = 1000
static

The default maximum number of collector jobs to create.

Only used if max_collector_jobs or max_files_per_collector_job are not set.

Definition at line 69 of file framework.py.

◆ files_to_iovs

dict files_to_iovs = {}

File -> Iov dictionary, should be :

{absolute_file_path:iov} : Where iov is a :py:class:IoV <caf.utils.IoV> object. Will be filled during CAF.run() if empty. To improve performance you can fill this yourself before calling CAF.run()

Definition at line 98 of file framework.py.

◆ input_files

input_files = []

Internal input_files stored for this calibration.

Definition at line 89 of file framework.py.

◆ job_cmd

list job_cmd = ["basf2", self.job_script.name, "--job-information job_info.json"]

The Collector caf.backends.Job.cmd attribute.

Probably using the job_script to run basf2.

Definition at line 153 of file framework.py.

◆ job_config

str job_config = "collector_job.json"
static

The name of the file containing the collector Job's dictionary.

Useful for recovery of the job configuration of the ones that ran previously.

Definition at line 72 of file framework.py.

◆ job_script

job_script = Path(find_file("calibration/scripts/caf/run_collector_path.py")).absolute()

job script

Definition at line 148 of file framework.py.

◆ output_patterns

list output_patterns = ["CollectorOutput.root"]

Output patterns of files produced by collector which will be used to pass to the Algorithm.data_input function.

You can set these to anything understood by glob.glob, but if you want to specify this you should also specify the Algorithm.data_input function to handle the different types of files and call the CalibrationAlgorithm.setInputFiles() with the correct ones.

Definition at line 110 of file framework.py.

◆ pre_collector_path

pre_collector_path = None

Since many collectors require some different setup, if you set this attribute to a basf2.Path it will be run before the collector and after the default RootInput module + HistoManager setup.

If this path contains RootInput then it's params are used in the RootInput module instead, except for the input_files parameter which is set to whichever files are passed to the collector subjob.

Definition at line 103 of file framework.py.

◆ splitter

splitter = None

The SubjobSplitter to use when constructing collector subjobs from the overall Job object.

If this is not set then your collector will run as one big job with all input files included.

Definition at line 116 of file framework.py.


The documentation for this class was generated from the following file: