5 This module contains various utility functions for the CAF and Job submission Backends to use.
8 from basf2
import B2INFO, B2WARNING, B2DEBUG
11 from collections
import deque
12 from collections
import OrderedDict
13 from collections
import namedtuple
14 from collections
import defaultdict
17 from functools
import singledispatch, update_wrapper
22 from urllib.parse
import urlparse
25 from ROOT.Belle2
import CalibrationAlgorithm, IntervalOfValidity
28 b2info_newline =
"\n" + (7 *
" ")
31 def B2INFO_MULTILINE(lines):
34 lines (list[str]): Lines to be printed in a single call to B2INFO
36 Quick little function that creates a string for B2INFO from a list of strings.
37 But it appends a newline character + the necessary indentation to the follwing line
38 so that the B2INFO output is nicely aligned.
39 Then it calls B2INFO on the output.
41 log_string = b2info_newline.join(lines)
45 def grouper(n, iterable):
48 n (int): Maximum size of the list that gets returned.
49 iterable (list): The original list that we want to return groups of size 'n' from.
56 chunk = tuple(itertools.islice(it, n))
62 def pairwise(iterable):
64 Iterate through a sequence by pairing up the current and next entry.
65 Note that when you hit the last one you don't get a (last, null), the
66 final iteration gives you (last-1, last) and then finishes. If you only
67 have one entry in the sequence this may be important as you will not get any
71 iterable (list): The iterable object we will loop over
76 a, b = itertools.tee(iterable)
81 def find_gaps_in_iov_list(iov_list):
83 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
84 runs which lie between two IoVs that are separated by an experiment e.g. between
85 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
86 number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
87 and will not be added to the output list of IoVs
90 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
91 i.e. runs that aren't covered.
94 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
98 for current_iov
in iov_list:
100 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
101 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
102 iov_gap = previous_highest.find_gap(current_lowest)
104 B2DEBUG(29, f
"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
106 previous_iov = current_iov
110 class ExpRun(namedtuple(
'ExpRun_Factory', [
'exp',
'run'])):
112 Class to define a single (Exp,Run) number i.e. not an IoV.
113 It is derived from a namedtuple created class.
115 We use the name 'ExpRun_Factory' in the factory creation so that
116 the MRO doesn't contain two of the same class names which is probably fine
120 exp (int): The experiment number
121 run (int): The run number
127 IoV: A simple IoV corresponding to this single ExpRun
129 return IoV(self.exp, self.run, self.exp, self.run)
133 Finds the IoV gap bewteen these two ExpRuns.
135 lower, upper = sorted((self, other))
136 if lower.exp == upper.exp
and lower.run != upper.run:
137 if (upper.run - lower.run) > 1:
138 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
145 class IoV(namedtuple(
'IoV_Factory', [
'exp_low',
'run_low',
'exp_high',
'run_high'])):
147 Python class to more easily manipulate an IoV and compare against others.
148 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
149 It is derived from a namedtuple created class.
151 We use the name 'IoV_Factory' in the factory creation so that
152 the MRO doesn't contain two of the same class names which is probably fine
155 Default construction is an 'empty' IoV of -1,-1,-1,-1
156 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
158 For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
161 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
163 The special method to create the tuple instance. Returning the instance
164 calls the __init__ method.
166 return super().
__new__(cls, exp_low, run_low, exp_high, run_high)
168 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
170 Called after __new__.
172 self.
_cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
176 Check if this IoV contains another one that is passed in.
182 Check if this IoV overlaps another one that is passed in.
190 Enum of Calibration results. Shouldn't be very necessary to use this
191 over the direct CalibrationAlgorithm members but it's nice to have
192 something pythonic ready to go.
195 ok = CalibrationAlgorithm.c_OK
197 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
199 iterate = CalibrationAlgorithm.c_Iterate
201 failure = CalibrationAlgorithm.c_Failure
204 IoV_Result = namedtuple(
'IoV_Result', [
'iov',
'result'])
209 Simple class to hold the information about a basf2 Local database.
210 Does a bit of checking that the file path entered is valid etc.
213 filepath (str): The file path of the database.txt file of the localdb
216 payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
220 def __init__(self, filepath, payload_dir=''):
221 f = pathlib.Path(filepath)
227 p = pathlib.Path(payload_dir)
231 raise ValueError(f
"The LocalDatabase payload_dir: {p} does not exist.")
233 raise ValueError(f
"The LocalDatabase filepath: {f} does not exist.")
238 Simple class to hold the information about a bas2 Central database.
239 Does no checking that a global tag exists.
240 This class could be made much simpler, but it's made to be similar to LocalDatabase.
243 global_tag (str): The Global Tag of the central database
247 def __init__(self, global_tag):
251 def split_runs_by_exp(runs):
254 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
257 list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
260 current_exp = runs[0].exp
263 if exprun.exp != current_exp:
264 split_by_runs.append(exp_list)
267 exp_list.append(exprun)
268 current_exp = exprun.exp
270 split_by_runs.append(exp_list)
274 def runs_overlapping_iov(iov, runs):
276 Takes an overall IoV() object and a list of ExpRun
277 and returns the set of ExpRun containing only those runs that overlap
281 iov (IoV): IoV to compare overlaps with
282 runs (list[ExpRun]): The available runs to check if them overlap with the IoV
287 overlapping_runs = set()
290 run_iov = run.make_iov()
291 if run_iov.overlaps(iov):
292 overlapping_runs.add(run)
293 return overlapping_runs
296 def iov_from_runs(runs):
298 Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
299 It returns an IoV() object and assumes that the list was in order to begin with.
302 exprun_low, exprun_high = runs[0], runs[-1]
304 exprun_low, exprun_high = runs[0], runs[0]
305 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
308 def iov_from_runvector(iov_vector):
310 Takes a vector of ExpRun from CalibrationAlgorithm and returns
311 the overall IoV from the lowest ExpRun to the highest. It returns
312 an IoV() object. It assumes that the vector was in order to begin with.
315 exprun_list = [list(
ExpRun(iov.first, iov.second))
for iov
in iov_vector]
316 if len(exprun_list) > 1:
317 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
319 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
320 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
323 def vector_from_runs(runs):
325 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
328 runs (list[ExpRun]): The runs to convert
331 ROOT.vector(ROOT.pair(int,int))
333 exprun_type = ROOT.pair(int, int)
334 run_vec = ROOT.vector(exprun_type)()
335 run_vec.reserve(len(runs))
337 run_vec.push_back(exprun_type(run.exp, run.run))
341 def runs_from_vector(exprun_vector):
343 Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
344 a Python list of (exp,run) tuples in the same order.
347 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
352 return [
ExpRun(exprun.first, exprun.second)
for exprun
in exprun_vector]
355 def find_run_lists_from_boundaries(boundaries, runs):
357 Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
358 the runs that are contained in the IoV of each boundary interval. We assume that this
359 is occuring in only one Experiment! We also assume that after the last boundary start
360 you want to include all runs that are higher than this starting ExpRun.
361 Note that the output ExpRuns in their lists will be sorted. So the ordering may be
362 different than the overall input order.
365 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
366 runs (list[ExpRun]): The available runs to chunk into boundaries
369 dict[IoV,list[ExpRun]]
371 boundary_iov_to_runs = {}
373 for start_current, start_next
in pairwise(boundaries):
375 boundary_iov =
IoV(*start_current, start_next.exp, start_next.run-1)
376 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
377 boundary_iov_to_runs[boundary_iov] = boundary_runs
379 boundary_iov =
IoV(*boundaries[-1], boundaries[-1].exp, -1)
380 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
381 boundary_iov_to_runs[boundary_iov] = boundary_runs
382 return boundary_iov_to_runs
385 def find_sources(dependencies):
387 Returns a deque of node names that have no input dependencies.
391 in_degrees = OrderedDict((k, 0)
for k
in dependencies)
392 for node, adjacency_list
in dependencies.items():
393 for future_node
in adjacency_list:
394 in_degrees[future_node] += 1
398 for name, in_degree
in in_degrees.items():
400 sources.appendleft(name)
405 def topological_sort(dependencies):
407 Does a topological sort of a graph (dictionary) where the keys are the
408 node names, and the values are lists of node names that depend on the
409 key (including zero dependencies). It should return the sorted
412 >>> dependencies = {}
413 >>> dependencies['c'] = ['a','b']
414 >>> dependencies['b'] = ['a']
415 >>> dependencies['a'] = []
416 >>> sorted = topological_sort(dependencies)
422 in_degrees = {k: 0
for k
in dependencies}
423 for node, adjacency_list
in dependencies.items():
424 for future_node
in adjacency_list:
425 in_degrees[future_node] += 1
429 for name, in_degree
in in_degrees.items():
431 sources.appendleft(name)
435 source = sources.pop()
437 for node
in dependencies[source]:
438 in_degrees[node] -= 1
439 if in_degrees[node] == 0:
440 sources.appendleft(node)
442 if len(order) == len(dependencies):
445 B2WARNING(
"Cyclic dependency detected, check CAF.add_dependency() calls.")
449 def all_dependencies(dependencies, order=None):
451 Here we pass in a dictionary of the form that is used in topological sort
452 where the keys are nodes, and the values are a list of the nodes that depend
455 However, the value (list) does not necessarily contain all of the future nodes
456 that depend on each one, only those that are directly adjacent in the graph.
457 So there are implicit dependencies not shown in the list.
459 This function calculates the implicit future nodes and returns an OrderedDict
460 with a full list for each node. This may be expensive in memory for
461 complex graphs so be careful.
463 If you care about the ordering of the final OrderedDict you can pass in a list
464 of the nodes. The final OrderedDict then has the same order as the order parameter.
466 full_dependencies = OrderedDict()
468 def add_out_nodes(node, node_set):
470 This is a recursive function that follows the tree of adjacent future nodes
471 and adds all of them to a set (so that we have unique items)
473 for out_node
in dependencies[node]:
474 node_set.add(out_node)
475 add_out_nodes(out_node, node_set)
478 order = dependencies.keys()
482 node_dependencies = set()
483 add_out_nodes(node, node_dependencies)
484 full_dependencies[node] = list(node_dependencies)
486 return full_dependencies
489 def past_from_future_dependencies(future_dependencies):
490 nodes = list(future_dependencies.keys())
491 past_dependencies = defaultdict(list)
492 for node, deps
in future_dependencies.items():
494 past_dependencies[dep].append(node)
495 return past_dependencies
498 def decode_json_string(object_string):
500 Simple function to call json.loads() on a string to return the
501 Python object constructed (Saves importing json everywhere).
503 return json.loads(object_string)
506 def method_dispatch(func):
508 Decorator that behaves exactly like functools.singledispatch
509 but which takes the second argument to be the important one
510 that we want to check the type of and dispatch to the correct function.
512 This is needed when trying to dispatch a method in a class, since the
513 first argument of the method is always 'self'.
514 Just decorate around class methods and their alternate functions:
516 >>> @method_dispatch # Default method
517 >>> def my_method(self, default_type, ...):
520 >>> @my_method.register(list) # Registers list method for dispatch
521 >>> def _(self, list_type, ...):
524 Doesn't work the same for property decorated class methods, as these
525 return a property builtin not a function and change the method naming.
526 Do this type of decoration to get them to work:
529 >>> def my_property(self):
530 >>> return self._my_property
532 >>> @my_property.setter
534 >>> def my_property(self, input_property):
537 >>> @my_property.fset.register(list)
538 >>> def _(self, input_list_properties):
541 dispatcher = singledispatch(func)
544 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
545 wrapper.register = dispatcher.register
546 update_wrapper(wrapper, func)
550 @contextlib.contextmanager
551 def temporary_workdir(path):
552 """Context manager that changes the working directory to the given
553 path and then changes it back to its previous value on exit.
555 prev_cwd = os.getcwd()
565 Simple wrapper for basf2 paths to allow some extra python functionality directly on
566 them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
571 Initialising with a path.
584 Takes the self.path attribute and uses the current state to recreate the
585 self.module_names list
592 Special method to allow 'module_name in path' type comparisons. Returns
593 a boolean and compares by module name.
600 Returns the index of the first instance of a module in the contained path.
605 def merge_local_databases(list_database_dirs, output_database_dir):
607 Takes a list of database directories and merges them into one new directory,
608 defined by the output_database_dir.
609 It assumes that each of the database directories is of the standard form:
613 -> <payload file name>
614 -> <payload file name>
617 os.mkdir(output_database_dir)
618 database_file_path = os.path.join(output_database_dir,
'database.txt')
619 with open(database_file_path,
'w')
as db_file:
620 for directory
in list_database_dirs:
621 if not os.path.exists(directory):
622 B2WARNING(f
"Database directory {directory} requested by collector but it doesn't exist!")
626 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
627 file_names = [file_name
for file_name
in listdir(directory)
if isfile(join(directory, file_name))]
628 file_names.remove(
'database.txt')
630 file_names = [os.path.join(directory, file_name)
for file_name
in file_names[:]]
631 for file_name
in file_names:
632 shutil.copy(file_name, output_database_dir)
634 with open(os.path.join(directory,
'database.txt'),
'r')
as f:
635 for line
in f.readlines():
639 def get_iov_from_file(file_path):
641 Returns an IoV of the exp/run contained within the given file.
642 Uses the b2file-metadata-show basf2 tool.
645 metadata_output = subprocess.check_output([
'b2file-metadata-show',
'--json', file_path])
646 m = json.loads(metadata_output.decode(
'utf-8'))
647 return IoV(m[
'experimentLow'], m[
'runLow'], m[
'experimentHigh'], m[
'runHigh'])
650 def get_file_iov_tuple(file_path):
652 Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
654 B2INFO(f
"Finding IoV for {file_path}.")
655 return (file_path, get_iov_from_file(file_path))
658 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
660 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
664 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
667 polling_time (int): Time between checking if our results are ready.
668 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
669 We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
670 filterfalse (function): An optional function object that will be called on each absolute filepath found from your patterns.
671 If True is returned the file will have its metadata returned. If False it will be skipped.
672 The filter function should take the filepath string as its only argument.
675 dict: Mapping of matching input file paths (Key) to their IoV (Value)
677 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
681 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
685 for file_path
in absolute_file_paths:
686 B2INFO(f
"Finding IoV for {file_path}.")
687 file_to_iov[file_path] = get_iov_from_file(file_path)
691 for file_path
in absolute_file_paths:
692 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
695 if all(map(
lambda result: result.ready(), results)):
697 B2INFO(
"Still waiting for IoVs to be calculated.")
698 time.sleep(polling_time)
700 for result
in results:
701 file_iov = result.get()
702 file_to_iov[file_iov[0]] = file_iov[1]
707 def find_absolute_file_paths(file_path_patterns):
709 Takes a file path list (including wildcards) and performs glob.glob()
710 to extract the absolute file paths to all matching files.
712 Also uses set() to prevent multiple instances of the same file path
713 but returns a list of file paths.
715 Any non "file" type urls are taken as absolute file paths already and are simply
718 existing_file_paths = set()
719 for file_pattern
in file_path_patterns:
720 file_pattern_uri = parse_file_uri(file_pattern)
721 if file_pattern_uri.scheme ==
"file":
722 input_files = glob.glob(file_pattern_uri.path)
724 B2WARNING(f
"No files matching {file_pattern} can be found, it will be skipped!")
726 for file_path
in input_files:
727 file_path = os.path.abspath(file_path)
728 if os.path.isfile(file_path):
729 existing_file_paths.add(file_path)
731 B2INFO(f
"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
732 existing_file_paths.add(file_pattern)
734 abs_file_paths = list(existing_file_paths)
735 return abs_file_paths
738 def parse_raw_data_iov(file_path):
740 For as long as the Raw data is stored using a predictable directory/filename structure
741 we can take advantage of it to more quickly infer the IoV of the files.
744 file_path (str): The absolute file path of a Raw data file on KEKCC
747 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
750 file_path = Path(file_path)
756 reduced_path = file_path.relative_to(
"/hsm/belle2/bdata/Data/Raw")
759 reduced_path = file_path.relative_to(
"/group/belle2/dataprod/Data/Raw")
762 path_exp = int(reduced_path.parts[0][1:])
763 path_run = int(reduced_path.parts[1][1:])
765 split_filename = reduced_path.name.split(
".")
766 filename_exp = int(split_filename[1])
767 filename_run = int(split_filename[2])
768 except ValueError
as e:
769 raise ValueError(f
"Wrong file path: {file_path}.")
from e
771 if path_exp == filename_exp
and path_run == filename_run:
772 return IoV(path_exp, path_run, path_exp, path_run)
774 raise ValueError(f
"Filename and directory gave different IoV after parsing for: {file_path}.")
777 def create_directories(path, overwrite=True):
779 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
780 or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
783 if (path.exists()
and overwrite):
786 if not path.exists():
790 def find_int_dirs(dir_path):
792 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
793 from iterations above your current one. This function will find directories that match an integer.
796 dir_path(`pathlib.Path`): The dircetory to search inside.
799 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
802 all_dirs = [sub_dir
for sub_dir
in dir_path.glob(
"*")
if sub_dir.is_dir()]
803 for directory
in all_dirs:
806 paths.append(directory)
807 except ValueError
as err:
812 def parse_file_uri(file_uri):
814 A central function for parsing file URI strings. Just so we only have to change it in one place later.
820 urllib.parse.ParseResult
822 return urlparse(file_uri, scheme=
"file", allow_fragments=
False)
825 UNBOUND_EXPRUN =
ExpRun(-1, -1)