13 This module contains various utility functions for the CAF and Job submission Backends to use.
16 from basf2
import B2INFO, B2WARNING, B2DEBUG
19 from collections
import deque
20 from collections
import OrderedDict
21 from collections
import namedtuple
22 from collections
import defaultdict
25 from functools
import singledispatch, update_wrapper
30 from urllib.parse
import urlparse
33 from ROOT.Belle2
import CalibrationAlgorithm, IntervalOfValidity
36 b2info_newline =
"\n" + (7 *
" ")
39 def B2INFO_MULTILINE(lines):
42 lines (list[str]): Lines to be printed in a single call to B2INFO
44 Quick little function that creates a string for B2INFO from a list of strings.
45 But it appends a newline character + the necessary indentation to the follwing line
46 so that the B2INFO output is nicely aligned.
47 Then it calls B2INFO on the output.
49 log_string = b2info_newline.join(lines)
53 def grouper(n, iterable):
56 n (int): Maximum size of the list that gets returned.
57 iterable (list): The original list that we want to return groups of size 'n' from.
64 chunk = tuple(itertools.islice(it, n))
70 def pairwise(iterable):
72 Iterate through a sequence by pairing up the current and next entry.
73 Note that when you hit the last one you don't get a (last, null), the
74 final iteration gives you (last-1, last) and then finishes. If you only
75 have one entry in the sequence this may be important as you will not get any
79 iterable (list): The iterable object we will loop over
84 a, b = itertools.tee(iterable)
89 def find_gaps_in_iov_list(iov_list):
91 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
92 runs which lie between two IoVs that are separated by an experiment e.g. between
93 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
94 number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
95 and will not be added to the output list of IoVs
98 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
99 i.e. runs that aren't covered.
102 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
106 for current_iov
in iov_list:
108 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
109 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
110 iov_gap = previous_highest.find_gap(current_lowest)
112 B2DEBUG(29, f
"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
114 previous_iov = current_iov
118 class ExpRun(namedtuple(
'ExpRun_Factory', [
'exp',
'run'])):
120 Class to define a single (Exp,Run) number i.e. not an IoV.
121 It is derived from a namedtuple created class.
123 We use the name 'ExpRun_Factory' in the factory creation so that
124 the MRO doesn't contain two of the same class names which is probably fine
128 exp (int): The experiment number
129 run (int): The run number
135 IoV: A simple IoV corresponding to this single ExpRun
137 return IoV(self.exp, self.run, self.exp, self.run)
141 Finds the IoV gap bewteen these two ExpRuns.
143 lower, upper = sorted((self, other))
144 if lower.exp == upper.exp
and lower.run != upper.run:
145 if (upper.run - lower.run) > 1:
146 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
153 class IoV(namedtuple(
'IoV_Factory', [
'exp_low',
'run_low',
'exp_high',
'run_high'])):
155 Python class to more easily manipulate an IoV and compare against others.
156 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
157 It is derived from a namedtuple created class.
159 We use the name 'IoV_Factory' in the factory creation so that
160 the MRO doesn't contain two of the same class names which is probably fine
163 Default construction is an 'empty' IoV of -1,-1,-1,-1
164 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
166 For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
169 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
171 The special method to create the tuple instance. Returning the instance
172 calls the __init__ method.
174 return super().
__new__(cls, exp_low, run_low, exp_high, run_high)
176 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
178 Called after __new__.
180 self.
_cpp_iov_cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
184 Check if this IoV contains another one that is passed in.
190 Check if this IoV overlaps another one that is passed in.
198 Enum of Calibration results. Shouldn't be very necessary to use this
199 over the direct CalibrationAlgorithm members but it's nice to have
200 something pythonic ready to go.
203 ok = CalibrationAlgorithm.c_OK
205 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
207 iterate = CalibrationAlgorithm.c_Iterate
209 failure = CalibrationAlgorithm.c_Failure
212 IoV_Result = namedtuple(
'IoV_Result', [
'iov',
'result'])
217 Simple class to hold the information about a basf2 Local database.
218 Does a bit of checking that the file path entered is valid etc.
221 filepath (str): The file path of the database.txt file of the localdb
224 payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
228 def __init__(self, filepath, payload_dir=''):
229 f = pathlib.Path(filepath)
235 p = pathlib.Path(payload_dir)
239 raise ValueError(f
"The LocalDatabase payload_dir: {p} does not exist.")
241 raise ValueError(f
"The LocalDatabase filepath: {f} does not exist.")
246 Simple class to hold the information about a bas2 Central database.
247 Does no checking that a global tag exists.
248 This class could be made much simpler, but it's made to be similar to LocalDatabase.
251 global_tag (str): The Global Tag of the central database
255 def __init__(self, global_tag):
259 def split_runs_by_exp(runs):
262 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
265 list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
268 current_exp = runs[0].exp
271 if exprun.exp != current_exp:
272 split_by_runs.append(exp_list)
275 exp_list.append(exprun)
276 current_exp = exprun.exp
278 split_by_runs.append(exp_list)
282 def runs_overlapping_iov(iov, runs):
284 Takes an overall IoV() object and a list of ExpRun
285 and returns the set of ExpRun containing only those runs that overlap
289 iov (IoV): IoV to compare overlaps with
290 runs (list[ExpRun]): The available runs to check if them overlap with the IoV
295 overlapping_runs = set()
298 run_iov = run.make_iov()
299 if run_iov.overlaps(iov):
300 overlapping_runs.add(run)
301 return overlapping_runs
304 def iov_from_runs(runs):
306 Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
307 It returns an IoV() object and assumes that the list was in order to begin with.
310 exprun_low, exprun_high = runs[0], runs[-1]
312 exprun_low, exprun_high = runs[0], runs[0]
313 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
316 def iov_from_runvector(iov_vector):
318 Takes a vector of ExpRun from CalibrationAlgorithm and returns
319 the overall IoV from the lowest ExpRun to the highest. It returns
320 an IoV() object. It assumes that the vector was in order to begin with.
323 exprun_list = [list(
ExpRun(iov.first, iov.second))
for iov
in iov_vector]
324 if len(exprun_list) > 1:
325 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
327 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
328 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
331 def vector_from_runs(runs):
333 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
336 runs (list[ExpRun]): The runs to convert
339 ROOT.vector(ROOT.pair(int,int))
341 exprun_type = ROOT.pair(int, int)
342 run_vec = ROOT.vector(exprun_type)()
343 run_vec.reserve(len(runs))
345 run_vec.push_back(exprun_type(run.exp, run.run))
349 def runs_from_vector(exprun_vector):
351 Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
352 a Python list of (exp,run) tuples in the same order.
355 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
360 return [
ExpRun(exprun.first, exprun.second)
for exprun
in exprun_vector]
363 def find_run_lists_from_boundaries(boundaries, runs):
365 Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
366 the runs that are contained in the IoV of each boundary interval. We assume that this
367 is occuring in only one Experiment! We also assume that after the last boundary start
368 you want to include all runs that are higher than this starting ExpRun.
369 Note that the output ExpRuns in their lists will be sorted. So the ordering may be
370 different than the overall input order.
373 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
374 runs (list[ExpRun]): The available runs to chunk into boundaries
377 dict[IoV,list[ExpRun]]
379 boundary_iov_to_runs = {}
381 for start_current, start_next
in pairwise(boundaries):
383 boundary_iov =
IoV(*start_current, start_next.exp, start_next.run-1)
384 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
385 boundary_iov_to_runs[boundary_iov] = boundary_runs
387 boundary_iov =
IoV(*boundaries[-1], boundaries[-1].exp, -1)
388 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
389 boundary_iov_to_runs[boundary_iov] = boundary_runs
390 return boundary_iov_to_runs
393 def find_sources(dependencies):
395 Returns a deque of node names that have no input dependencies.
399 in_degrees = OrderedDict((k, 0)
for k
in dependencies)
400 for node, adjacency_list
in dependencies.items():
401 for future_node
in adjacency_list:
402 in_degrees[future_node] += 1
406 for name, in_degree
in in_degrees.items():
408 sources.appendleft(name)
413 def topological_sort(dependencies):
415 Does a topological sort of a graph (dictionary) where the keys are the
416 node names, and the values are lists of node names that depend on the
417 key (including zero dependencies). It should return the sorted
420 >>> dependencies = {}
421 >>> dependencies['c'] = ['a','b']
422 >>> dependencies['b'] = ['a']
423 >>> dependencies['a'] = []
424 >>> sorted = topological_sort(dependencies)
430 in_degrees = {k: 0
for k
in dependencies}
431 for node, adjacency_list
in dependencies.items():
432 for future_node
in adjacency_list:
433 in_degrees[future_node] += 1
437 for name, in_degree
in in_degrees.items():
439 sources.appendleft(name)
443 source = sources.pop()
445 for node
in dependencies[source]:
446 in_degrees[node] -= 1
447 if in_degrees[node] == 0:
448 sources.appendleft(node)
450 if len(order) == len(dependencies):
453 B2WARNING(
"Cyclic dependency detected, check CAF.add_dependency() calls.")
457 def all_dependencies(dependencies, order=None):
459 Here we pass in a dictionary of the form that is used in topological sort
460 where the keys are nodes, and the values are a list of the nodes that depend
463 However, the value (list) does not necessarily contain all of the future nodes
464 that depend on each one, only those that are directly adjacent in the graph.
465 So there are implicit dependencies not shown in the list.
467 This function calculates the implicit future nodes and returns an OrderedDict
468 with a full list for each node. This may be expensive in memory for
469 complex graphs so be careful.
471 If you care about the ordering of the final OrderedDict you can pass in a list
472 of the nodes. The final OrderedDict then has the same order as the order parameter.
474 full_dependencies = OrderedDict()
476 def add_out_nodes(node, node_set):
478 This is a recursive function that follows the tree of adjacent future nodes
479 and adds all of them to a set (so that we have unique items)
481 for out_node
in dependencies[node]:
482 node_set.add(out_node)
483 add_out_nodes(out_node, node_set)
486 order = dependencies.keys()
490 node_dependencies = set()
491 add_out_nodes(node, node_dependencies)
492 full_dependencies[node] = list(node_dependencies)
494 return full_dependencies
497 def past_from_future_dependencies(future_dependencies):
498 past_dependencies = defaultdict(list)
499 for node, deps
in future_dependencies.items():
501 past_dependencies[dep].append(node)
502 return past_dependencies
505 def decode_json_string(object_string):
507 Simple function to call json.loads() on a string to return the
508 Python object constructed (Saves importing json everywhere).
510 return json.loads(object_string)
513 def method_dispatch(func):
515 Decorator that behaves exactly like functools.singledispatch
516 but which takes the second argument to be the important one
517 that we want to check the type of and dispatch to the correct function.
519 This is needed when trying to dispatch a method in a class, since the
520 first argument of the method is always 'self'.
521 Just decorate around class methods and their alternate functions:
523 >>> @method_dispatch # Default method
524 >>> def my_method(self, default_type, ...):
527 >>> @my_method.register(list) # Registers list method for dispatch
528 >>> def _(self, list_type, ...):
531 Doesn't work the same for property decorated class methods, as these
532 return a property builtin not a function and change the method naming.
533 Do this type of decoration to get them to work:
536 >>> def my_property(self):
537 >>> return self._my_property
539 >>> @my_property.setter
541 >>> def my_property(self, input_property):
544 >>> @my_property.fset.register(list)
545 >>> def _(self, input_list_properties):
548 dispatcher = singledispatch(func)
551 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
552 wrapper.register = dispatcher.register
553 update_wrapper(wrapper, func)
557 @contextlib.contextmanager
558 def temporary_workdir(path):
559 """Context manager that changes the working directory to the given
560 path and then changes it back to its previous value on exit.
562 prev_cwd = os.getcwd()
572 Simple wrapper for basf2 paths to allow some extra python functionality directly on
573 them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
578 Initialising with a path.
591 Takes the self.path attribute and uses the current state to recreate the
592 self.module_names list
599 Special method to allow 'module_name in path' type comparisons. Returns
600 a boolean and compares by module name.
607 Returns the index of the first instance of a module in the contained path.
612 def merge_local_databases(list_database_dirs, output_database_dir):
614 Takes a list of database directories and merges them into one new directory,
615 defined by the output_database_dir.
616 It assumes that each of the database directories is of the standard form:
620 -> <payload file name>
621 -> <payload file name>
624 os.mkdir(output_database_dir)
625 database_file_path = os.path.join(output_database_dir,
'database.txt')
626 with open(database_file_path,
'w')
as db_file:
627 for directory
in list_database_dirs:
628 if not os.path.exists(directory):
629 B2WARNING(f
"Database directory {directory} requested by collector but it doesn't exist!")
633 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
634 file_names = [file_name
for file_name
in listdir(directory)
if isfile(join(directory, file_name))]
635 file_names.remove(
'database.txt')
637 file_names = [os.path.join(directory, file_name)
for file_name
in file_names[:]]
638 for file_name
in file_names:
639 shutil.copy(file_name, output_database_dir)
641 with open(os.path.join(directory,
'database.txt'),
'r')
as f:
642 for line
in f.readlines():
646 def get_iov_from_file(file_path):
648 Returns an IoV of the exp/run contained within the given file.
649 Uses the b2file-metadata-show basf2 tool.
652 metadata_output = subprocess.check_output([
'b2file-metadata-show',
'--json', file_path])
653 m = json.loads(metadata_output.decode(
'utf-8'))
654 return IoV(m[
'experimentLow'], m[
'runLow'], m[
'experimentHigh'], m[
'runHigh'])
657 def get_file_iov_tuple(file_path):
659 Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
661 B2INFO(f
"Finding IoV for {file_path}.")
662 return (file_path, get_iov_from_file(file_path))
665 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
667 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
671 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
674 polling_time (int): Time between checking if our results are ready.
675 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
676 We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
678 filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
679 patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
680 should take the filepath string as its only argument.
683 dict: Mapping of matching input file paths (Key) to their IoV (Value)
685 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
689 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
693 for file_path
in absolute_file_paths:
694 B2INFO(f
"Finding IoV for {file_path}.")
695 file_to_iov[file_path] = get_iov_from_file(file_path)
699 for file_path
in absolute_file_paths:
700 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
703 if all(map(
lambda result: result.ready(), results)):
705 B2INFO(
"Still waiting for IoVs to be calculated.")
706 time.sleep(polling_time)
708 for result
in results:
709 file_iov = result.get()
710 file_to_iov[file_iov[0]] = file_iov[1]
715 def find_absolute_file_paths(file_path_patterns):
717 Takes a file path list (including wildcards) and performs glob.glob()
718 to extract the absolute file paths to all matching files.
720 Also uses set() to prevent multiple instances of the same file path
721 but returns a list of file paths.
723 Any non "file" type urls are taken as absolute file paths already and are simply
726 existing_file_paths = set()
727 for file_pattern
in file_path_patterns:
728 file_pattern_uri = parse_file_uri(file_pattern)
729 if file_pattern_uri.scheme ==
"file":
730 input_files = glob.glob(file_pattern_uri.path)
732 B2WARNING(f
"No files matching {file_pattern} can be found, it will be skipped!")
734 for file_path
in input_files:
735 file_path = os.path.abspath(file_path)
736 if os.path.isfile(file_path):
737 existing_file_paths.add(file_path)
739 B2INFO(f
"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
740 existing_file_paths.add(file_pattern)
742 abs_file_paths = list(existing_file_paths)
743 return abs_file_paths
746 def parse_raw_data_iov(file_path):
748 For as long as the Raw data is stored using a predictable directory/filename structure
749 we can take advantage of it to more quickly infer the IoV of the files.
752 file_path (str): The absolute file path of a Raw data file on KEKCC
755 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
758 file_path = Path(file_path)
764 reduced_path = file_path.relative_to(
"/hsm/belle2/bdata/Data/Raw")
767 reduced_path = file_path.relative_to(
"/group/belle2/dataprod/Data/Raw")
770 path_exp = int(reduced_path.parts[0][1:])
771 path_run = int(reduced_path.parts[1][1:])
773 split_filename = reduced_path.name.split(
".")
774 filename_exp = int(split_filename[1])
775 filename_run = int(split_filename[2])
776 except ValueError
as e:
777 raise ValueError(f
"Wrong file path: {file_path}.")
from e
779 if path_exp == filename_exp
and path_run == filename_run:
780 return IoV(path_exp, path_run, path_exp, path_run)
782 raise ValueError(f
"Filename and directory gave different IoV after parsing for: {file_path}.")
785 def create_directories(path, overwrite=True):
787 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
788 or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
791 if (path.exists()
and overwrite):
794 if not path.exists():
798 def find_int_dirs(dir_path):
800 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
801 from iterations above your current one. This function will find directories that match an integer.
804 dir_path(`pathlib.Path`): The dircetory to search inside.
807 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
810 all_dirs = [sub_dir
for sub_dir
in dir_path.glob(
"*")
if sub_dir.is_dir()]
811 for directory
in all_dirs:
814 paths.append(directory)
820 def parse_file_uri(file_uri):
822 A central function for parsing file URI strings. Just so we only have to change it in one place later.
828 urllib.parse.ParseResult
830 return urlparse(file_uri, scheme=
"file", allow_fragments=
False)
833 UNBOUND_EXPRUN =
ExpRun(-1, -1)
def find_gap(self, other)
def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)