15 This module contains various utility functions for the CAF and Job submission Backends to use.
18 from basf2
import B2INFO, B2WARNING, B2DEBUG
21 from collections
import deque
22 from collections
import OrderedDict
23 from collections
import namedtuple
24 from collections
import defaultdict
27 from functools
import singledispatch, update_wrapper
32 from urllib.parse
import urlparse
35 from ROOT.Belle2
import CalibrationAlgorithm, IntervalOfValidity
38 b2info_newline =
"\n" + (7 *
" ")
41 def B2INFO_MULTILINE(lines):
44 lines (list[str]): Lines to be printed in a single call to B2INFO
46 Quick little function that creates a string for B2INFO from a list of strings.
47 But it appends a newline character + the necessary indentation to the follwing line
48 so that the B2INFO output is nicely aligned.
49 Then it calls B2INFO on the output.
51 log_string = b2info_newline.join(lines)
55 def grouper(n, iterable):
58 n (int): Maximum size of the list that gets returned.
59 iterable (list): The original list that we want to return groups of size 'n' from.
66 chunk = tuple(itertools.islice(it, n))
72 def pairwise(iterable):
74 Iterate through a sequence by pairing up the current and next entry.
75 Note that when you hit the last one you don't get a (last, null), the
76 final iteration gives you (last-1, last) and then finishes. If you only
77 have one entry in the sequence this may be important as you will not get any
81 iterable (list): The iterable object we will loop over
86 a, b = itertools.tee(iterable)
91 def find_gaps_in_iov_list(iov_list):
93 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
94 runs which lie between two IoVs that are separated by an experiment e.g. between
95 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
96 number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
97 and will not be added to the output list of IoVs
100 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
101 i.e. runs that aren't covered.
104 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
108 for current_iov
in iov_list:
110 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
111 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
112 iov_gap = previous_highest.find_gap(current_lowest)
114 B2DEBUG(29, f
"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
116 previous_iov = current_iov
120 class ExpRun(namedtuple(
'ExpRun_Factory', [
'exp',
'run'])):
122 Class to define a single (Exp,Run) number i.e. not an IoV.
123 It is derived from a namedtuple created class.
125 We use the name 'ExpRun_Factory' in the factory creation so that
126 the MRO doesn't contain two of the same class names which is probably fine
130 exp (int): The experiment number
131 run (int): The run number
137 IoV: A simple IoV corresponding to this single ExpRun
139 return IoV(self.exp, self.run, self.exp, self.run)
141 def find_gap(self, other):
143 Finds the IoV gap bewteen these two ExpRuns.
145 lower, upper = sorted((self, other))
146 if lower.exp == upper.exp
and lower.run != upper.run:
147 if (upper.run - lower.run) > 1:
148 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
155 class IoV(namedtuple(
'IoV_Factory', [
'exp_low',
'run_low',
'exp_high',
'run_high'])):
157 Python class to more easily manipulate an IoV and compare against others.
158 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
159 It is derived from a namedtuple created class.
161 We use the name 'IoV_Factory' in the factory creation so that
162 the MRO doesn't contain two of the same class names which is probably fine
165 Default construction is an 'empty' IoV of -1,-1,-1,-1
166 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
168 For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
171 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
173 The special method to create the tuple instance. Returning the instance
174 calls the __init__ method.
176 return super().__new__(cls, exp_low, run_low, exp_high, run_high)
178 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
180 Called after __new__.
182 self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
184 def contains(self, iov):
186 Check if this IoV contains another one that is passed in.
188 return self._cpp_iov.contains(iov._cpp_iov)
190 def overlaps(self, iov):
192 Check if this IoV overlaps another one that is passed in.
194 return self._cpp_iov.overlaps(iov._cpp_iov)
198 class AlgResult(enum.Enum):
200 Enum of Calibration results. Shouldn't be very necessary to use this
201 over the direct CalibrationAlgorithm members but it's nice to have
202 something pythonic ready to go.
205 ok = CalibrationAlgorithm.c_OK
207 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
209 iterate = CalibrationAlgorithm.c_Iterate
211 failure = CalibrationAlgorithm.c_Failure
214 IoV_Result = namedtuple(
'IoV_Result', [
'iov',
'result'])
217 class LocalDatabase():
219 Simple class to hold the information about a basf2 Local database.
220 Does a bit of checking that the file path entered is valid etc.
223 filepath (str): The file path of the database.txt file of the localdb
226 payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
230 def __init__(self, filepath, payload_dir=''):
231 f = pathlib.Path(filepath)
233 self.filepath = f.resolve()
235 self.payload_dir = pathlib.Path(self.filepath.parent)
237 p = pathlib.Path(payload_dir)
239 self.payload_dir = p.resolve()
241 raise ValueError(f
"The LocalDatabase payload_dir: {p} does not exist.")
243 raise ValueError(f
"The LocalDatabase filepath: {f} does not exist.")
246 class CentralDatabase():
248 Simple class to hold the information about a bas2 Central database.
249 Does no checking that a global tag exists.
250 This class could be made much simpler, but it's made to be similar to LocalDatabase.
253 global_tag (str): The Global Tag of the central database
257 def __init__(self, global_tag):
258 self.global_tag = global_tag
261 def split_runs_by_exp(runs):
264 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
267 list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
270 current_exp = runs[0].exp
273 if exprun.exp != current_exp:
274 split_by_runs.append(exp_list)
277 exp_list.append(exprun)
278 current_exp = exprun.exp
280 split_by_runs.append(exp_list)
284 def runs_overlapping_iov(iov, runs):
286 Takes an overall IoV() object and a list of ExpRun
287 and returns the set of ExpRun containing only those runs that overlap
291 iov (IoV): IoV to compare overlaps with
292 runs (list[ExpRun]): The available runs to check if them overlap with the IoV
297 overlapping_runs = set()
300 run_iov = run.make_iov()
301 if run_iov.overlaps(iov):
302 overlapping_runs.add(run)
303 return overlapping_runs
306 def iov_from_runs(runs):
308 Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
309 It returns an IoV() object and assumes that the list was in order to begin with.
312 exprun_low, exprun_high = runs[0], runs[-1]
314 exprun_low, exprun_high = runs[0], runs[0]
315 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
318 def iov_from_runvector(iov_vector):
320 Takes a vector of ExpRun from CalibrationAlgorithm and returns
321 the overall IoV from the lowest ExpRun to the highest. It returns
322 an IoV() object. It assumes that the vector was in order to begin with.
325 exprun_list = [list(ExpRun(iov.first, iov.second))
for iov
in iov_vector]
326 if len(exprun_list) > 1:
327 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
329 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
330 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
333 def vector_from_runs(runs):
335 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
338 runs (list[ExpRun]): The runs to convert
341 ROOT.vector(ROOT.pair(int,int))
343 exprun_type = ROOT.pair(int, int)
344 run_vec = ROOT.vector(exprun_type)()
345 run_vec.reserve(len(runs))
347 run_vec.push_back(exprun_type(run.exp, run.run))
351 def runs_from_vector(exprun_vector):
353 Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
354 a Python list of (exp,run) tuples in the same order.
357 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
362 return [ExpRun(exprun.first, exprun.second)
for exprun
in exprun_vector]
365 def find_run_lists_from_boundaries(boundaries, runs):
367 Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
368 the runs that are contained in the IoV of each boundary interval. We assume that this
369 is occuring in only one Experiment! We also assume that after the last boundary start
370 you want to include all runs that are higher than this starting ExpRun.
371 Note that the output ExpRuns in their lists will be sorted. So the ordering may be
372 different than the overall input order.
375 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
376 runs (list[ExpRun]): The available runs to chunk into boundaries
379 dict[IoV,list[ExpRun]]
381 boundary_iov_to_runs = {}
383 for start_current, start_next
in pairwise(boundaries):
385 boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
386 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
387 boundary_iov_to_runs[boundary_iov] = boundary_runs
389 boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
390 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
391 boundary_iov_to_runs[boundary_iov] = boundary_runs
392 return boundary_iov_to_runs
395 def find_sources(dependencies):
397 Returns a deque of node names that have no input dependencies.
401 in_degrees = OrderedDict((k, 0)
for k
in dependencies)
402 for node, adjacency_list
in dependencies.items():
403 for future_node
in adjacency_list:
404 in_degrees[future_node] += 1
408 for name, in_degree
in in_degrees.items():
410 sources.appendleft(name)
415 def topological_sort(dependencies):
417 Does a topological sort of a graph (dictionary) where the keys are the
418 node names, and the values are lists of node names that depend on the
419 key (including zero dependencies). It should return the sorted
422 >>> dependencies = {}
423 >>> dependencies['c'] = ['a','b']
424 >>> dependencies['b'] = ['a']
425 >>> dependencies['a'] = []
426 >>> sorted = topological_sort(dependencies)
432 in_degrees = {k: 0
for k
in dependencies}
433 for node, adjacency_list
in dependencies.items():
434 for future_node
in adjacency_list:
435 in_degrees[future_node] += 1
439 for name, in_degree
in in_degrees.items():
441 sources.appendleft(name)
445 source = sources.pop()
447 for node
in dependencies[source]:
448 in_degrees[node] -= 1
449 if in_degrees[node] == 0:
450 sources.appendleft(node)
452 if len(order) == len(dependencies):
455 B2WARNING(
"Cyclic dependency detected, check CAF.add_dependency() calls.")
459 def all_dependencies(dependencies, order=None):
461 Here we pass in a dictionary of the form that is used in topological sort
462 where the keys are nodes, and the values are a list of the nodes that depend
465 However, the value (list) does not necessarily contain all of the future nodes
466 that depend on each one, only those that are directly adjacent in the graph.
467 So there are implicit dependencies not shown in the list.
469 This function calculates the implicit future nodes and returns an OrderedDict
470 with a full list for each node. This may be expensive in memory for
471 complex graphs so be careful.
473 If you care about the ordering of the final OrderedDict you can pass in a list
474 of the nodes. The final OrderedDict then has the same order as the order parameter.
476 full_dependencies = OrderedDict()
478 def add_out_nodes(node, node_set):
480 This is a recursive function that follows the tree of adjacent future nodes
481 and adds all of them to a set (so that we have unique items)
483 for out_node
in dependencies[node]:
484 node_set.add(out_node)
485 add_out_nodes(out_node, node_set)
488 order = dependencies.keys()
492 node_dependencies = set()
493 add_out_nodes(node, node_dependencies)
494 full_dependencies[node] = list(node_dependencies)
496 return full_dependencies
499 def past_from_future_dependencies(future_dependencies):
500 past_dependencies = defaultdict(list)
501 for node, deps
in future_dependencies.items():
503 past_dependencies[dep].append(node)
504 return past_dependencies
507 def decode_json_string(object_string):
509 Simple function to call json.loads() on a string to return the
510 Python object constructed (Saves importing json everywhere).
512 return json.loads(object_string)
515 def method_dispatch(func):
517 Decorator that behaves exactly like functools.singledispatch
518 but which takes the second argument to be the important one
519 that we want to check the type of and dispatch to the correct function.
521 This is needed when trying to dispatch a method in a class, since the
522 first argument of the method is always 'self'.
523 Just decorate around class methods and their alternate functions:
525 >>> @method_dispatch # Default method
526 >>> def my_method(self, default_type, ...):
529 >>> @my_method.register(list) # Registers list method for dispatch
530 >>> def _(self, list_type, ...):
533 Doesn't work the same for property decorated class methods, as these
534 return a property builtin not a function and change the method naming.
535 Do this type of decoration to get them to work:
538 >>> def my_property(self):
539 >>> return self._my_property
541 >>> @my_property.setter
543 >>> def my_property(self, input_property):
546 >>> @my_property.fset.register(list)
547 >>> def _(self, input_list_properties):
550 dispatcher = singledispatch(func)
553 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
554 wrapper.register = dispatcher.register
555 update_wrapper(wrapper, func)
559 @contextlib.contextmanager
560 def temporary_workdir(path):
561 """Context manager that changes the working directory to the given
562 path and then changes it back to its previous value on exit.
564 prev_cwd = os.getcwd()
574 Simple wrapper for basf2 paths to allow some extra python functionality directly on
575 them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
578 def __init__(self, path=None):
580 Initialising with a path.
588 self._module_names = []
591 def _update_names(self):
593 Takes the self.path attribute and uses the current state to recreate the
594 self.module_names list
596 for module
in self.path.
modules():
597 self._module_names.append(module.name())
599 def __contains__(self, module_name):
601 Special method to allow 'module_name in path' type comparisons. Returns
602 a boolean and compares by module name.
605 return module_name
in self._module_names
607 def index(self, module_name):
609 Returns the index of the first instance of a module in the contained path.
611 return self._module_names.index(module_name)
614 def merge_local_databases(list_database_dirs, output_database_dir):
616 Takes a list of database directories and merges them into one new directory,
617 defined by the output_database_dir.
618 It assumes that each of the database directories is of the standard form:
622 -> <payload file name>
623 -> <payload file name>
626 os.mkdir(output_database_dir)
627 database_file_path = os.path.join(output_database_dir,
'database.txt')
628 with open(database_file_path,
'w')
as db_file:
629 for directory
in list_database_dirs:
630 if not os.path.exists(directory):
631 B2WARNING(f
"Database directory {directory} requested by collector but it doesn't exist!")
635 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
636 file_names = [file_name
for file_name
in listdir(directory)
if isfile(join(directory, file_name))]
637 file_names.remove(
'database.txt')
639 file_names = [os.path.join(directory, file_name)
for file_name
in file_names[:]]
640 for file_name
in file_names:
641 shutil.copy(file_name, output_database_dir)
643 with open(os.path.join(directory,
'database.txt'))
as f:
644 for line
in f.readlines():
648 def get_iov_from_file(file_path):
650 Returns an IoV of the exp/run contained within the given file.
651 Uses the b2file-metadata-show basf2 tool.
654 metadata_output = subprocess.check_output([
'b2file-metadata-show',
'--json', file_path])
655 m = json.loads(metadata_output.decode(
'utf-8'))
656 return IoV(m[
'experimentLow'], m[
'runLow'], m[
'experimentHigh'], m[
'runHigh'])
659 def get_file_iov_tuple(file_path):
661 Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
663 B2INFO(f
"Finding IoV for {file_path}.")
664 return (file_path, get_iov_from_file(file_path))
667 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
669 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
673 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
676 polling_time (int): Time between checking if our results are ready.
677 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
678 We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
680 filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
681 patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
682 should take the filepath string as its only argument.
685 dict: Mapping of matching input file paths (Key) to their IoV (Value)
687 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
691 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
695 for file_path
in absolute_file_paths:
696 B2INFO(f
"Finding IoV for {file_path}.")
697 file_to_iov[file_path] = get_iov_from_file(file_path)
701 for file_path
in absolute_file_paths:
702 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
705 if all(map(
lambda result: result.ready(), results)):
707 B2INFO(
"Still waiting for IoVs to be calculated.")
708 time.sleep(polling_time)
710 for result
in results:
711 file_iov = result.get()
712 file_to_iov[file_iov[0]] = file_iov[1]
717 def find_absolute_file_paths(file_path_patterns):
719 Takes a file path list (including wildcards) and performs glob.glob()
720 to extract the absolute file paths to all matching files.
722 Also uses set() to prevent multiple instances of the same file path
723 but returns a list of file paths.
725 Any non "file" type urls are taken as absolute file paths already and are simply
728 existing_file_paths = set()
729 for file_pattern
in file_path_patterns:
730 file_pattern_uri = parse_file_uri(file_pattern)
731 if file_pattern_uri.scheme ==
"file":
732 input_files = glob.glob(file_pattern_uri.path)
734 B2WARNING(f
"No files matching {file_pattern} can be found, it will be skipped!")
736 for file_path
in input_files:
737 file_path = os.path.abspath(file_path)
738 if os.path.isfile(file_path):
739 existing_file_paths.add(file_path)
741 B2INFO(f
"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
742 existing_file_paths.add(file_pattern)
744 abs_file_paths = list(existing_file_paths)
745 return abs_file_paths
748 def parse_raw_data_iov(file_path):
750 For as long as the Raw data is stored using a predictable directory/filename structure
751 we can take advantage of it to more quickly infer the IoV of the files.
754 file_path (str): The absolute file path of a Raw data file on KEKCC
757 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
760 file_path = Path(file_path)
766 reduced_path = file_path.relative_to(
"/hsm/belle2/bdata/Data/Raw")
769 reduced_path = file_path.relative_to(
"/group/belle2/dataprod/Data/Raw")
772 path_exp = int(reduced_path.parts[0][1:])
773 path_run = int(reduced_path.parts[1][1:])
775 split_filename = reduced_path.name.split(
".")
776 filename_exp = int(split_filename[1])
777 filename_run = int(split_filename[2])
778 except ValueError
as e:
779 raise ValueError(f
"Wrong file path: {file_path}.")
from e
781 if path_exp == filename_exp
and path_run == filename_run:
782 return IoV(path_exp, path_run, path_exp, path_run)
784 raise ValueError(f
"Filename and directory gave different IoV after parsing for: {file_path}.")
787 def create_directories(path, overwrite=True):
789 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
790 or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
793 if (path.exists()
and overwrite):
796 if not path.exists():
800 def find_int_dirs(dir_path):
802 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
803 from iterations above your current one. This function will find directories that match an integer.
806 dir_path(`pathlib.Path`): The dircetory to search inside.
809 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
812 all_dirs = [sub_dir
for sub_dir
in dir_path.glob(
"*")
if sub_dir.is_dir()]
813 for directory
in all_dirs:
816 paths.append(directory)
822 def parse_file_uri(file_uri):
824 A central function for parsing file URI strings. Just so we only have to change it in one place later.
830 urllib.parse.ParseResult
832 return urlparse(file_uri, scheme=
"file", allow_fragments=
False)
835 UNBOUND_EXPRUN = ExpRun(-1, -1)