15This module contains various utility functions for the CAF and Job submission Backends to use.
18from basf2
import B2INFO, B2WARNING, B2DEBUG
21from collections
import deque
22from collections
import OrderedDict
23from collections
import namedtuple
24from collections
import defaultdict
27from functools
import singledispatch, update_wrapper
32from urllib.parse
import urlparse
35from ROOT.Belle2
import CalibrationAlgorithm, IntervalOfValidity
38b2info_newline =
"\n" + (7 *
" ")
41def B2INFO_MULTILINE(lines):
44 lines (list[str]): Lines to be printed in a single call to B2INFO
46 Quick little function that creates a string
for B2INFO
from a list of strings.
47 But it appends a newline character + the necessary indentation to the following line
48 so that the B2INFO output
is nicely aligned.
49 Then it calls B2INFO on the output.
51 log_string = b2info_newline.join(lines)
55def grouper(n, iterable):
58 n (int): Maximum size of the list that gets returned.
59 iterable (list): The original list that we want to return groups of size
'n' from.
66 chunk = tuple(itertools.islice(it, n))
72def pairwise(iterable):
74 Iterate through a sequence by pairing up the current and next entry.
75 Note that when you hit the last one you don
't get a (last, null), the final iteration gives you (last-1, last) and then finishes. If you only
76 have one entry
in the sequence this may be important
as you will
not get any
80 iterable (list): The iterable object we will loop over
85 a, b = itertools.tee(iterable)
90def find_gaps_in_iov_list(iov_list):
92 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
93 runs which lie between two IoVs that are separated by an experiment e.g. between
94 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown
if there were supposed to be more runs than run
95 number 10
in experiment 1 before starting experiment 2. Therefore this
is not counted
as a gap
96 and will
not be added to the output list of IoVs
99 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check
for 'gaps'
100 i.e. runs that aren
't covered.
103 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
107 for current_iov
in iov_list:
109 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
110 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
111 iov_gap = previous_highest.find_gap(current_lowest)
113 B2DEBUG(29, f
"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
115 previous_iov = current_iov
119class ExpRun(namedtuple(
'ExpRun_Factory', [
'exp',
'run'])):
121 Class to define a single (Exp,Run) number i.e. not an IoV.
122 It
is derived
from a namedtuple created
class.
124 We use the name
'ExpRun_Factory' in the factory creation so that
125 the MRO doesn
't contain two of the same class names which is probably fine
129 exp (int): The experiment number
130 run (int): The run number
136 IoV: A simple IoV corresponding to this single ExpRun
138 return IoV(self.exp, self.run, self.exp, self.run)
140 def find_gap(self, other):
142 Finds the IoV gap between these two ExpRuns.
144 lower, upper = sorted((self, other))
145 if lower.exp == upper.exp
and lower.run != upper.run:
146 if (upper.run - lower.run) > 1:
147 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
154class IoV(namedtuple(
'IoV_Factory', [
'exp_low',
'run_low',
'exp_high',
'run_high'])):
156 Python class to more easily manipulate an IoV and compare against others.
157 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
158 It
is derived
from a namedtuple created
class.
160 We use the name
'IoV_Factory' in the factory creation so that
161 the MRO doesn
't contain two of the same class names which is probably fine
164 Default construction is an
'empty' IoV of -1,-1,-1,-1
165 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
167 For an IoV that encompasses all experiments
and runs use 0,0,-1,-1.
170 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
172 The special method to create the tuple instance. Returning the instance
173 calls the __init__ method.
175 return super().__new__(cls, exp_low, run_low, exp_high, run_high)
177 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
179 Called after __new__.
181 self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
183 def contains(self, iov):
185 Check if this IoV contains another one that
is passed
in.
187 return self._cpp_iov.contains(iov._cpp_iov)
189 def overlaps(self, iov):
191 Check if this IoV overlaps another one that
is passed
in.
193 return self._cpp_iov.overlaps(iov._cpp_iov)
197class AlgResult(enum.Enum):
199 Enum of Calibration results. Shouldn't be very necessary to use this
200 over the direct CalibrationAlgorithm members but it's nice to have
201 something pythonic ready to go.
204 ok = CalibrationAlgorithm.c_OK
206 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
208 iterate = CalibrationAlgorithm.c_Iterate
210 failure = CalibrationAlgorithm.c_Failure
213IoV_Result = namedtuple('IoV_Result', [
'iov',
'result'])
216class LocalDatabase():
218 Simple class to hold the information about a
basf2 Local
database.
219 Does a bit of checking that the file path entered
is valid etc.
222 filepath (str): The file path of the database.txt file of the localdb
225 payload_dir (str): If the payload directory
is different to the directory containing the filepath, you can set it here.
229 def __init__(self, filepath, payload_dir=''):
230 f = pathlib.Path(filepath)
232 self.filepath = f.resolve()
234 self.payload_dir = pathlib.Path(self.filepath.parent)
236 p = pathlib.Path(payload_dir)
238 self.payload_dir = p.resolve()
240 raise ValueError(f
"The LocalDatabase payload_dir: {p} does not exist.")
242 raise ValueError(f
"The LocalDatabase filepath: {f} does not exist.")
245class CentralDatabase():
247 Simple class to hold the information about a bas2 Central
database.
248 Does no checking that a
global tag exists.
249 This
class could be made much simpler, but it
's made to be similar to LocalDatabase.
251 global_tag (str): The Global Tag of the central database
255 def __init__(self, global_tag):
256 self.global_tag = global_tag
259def split_runs_by_exp(runs):
262 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
265 list[list[ExpRun]]: Same as original list but sublists are generated
for each Exp value
268 current_exp = runs[0].exp
271 if exprun.exp != current_exp:
272 split_by_runs.append(exp_list)
275 exp_list.append(exprun)
276 current_exp = exprun.exp
278 split_by_runs.append(exp_list)
282def runs_overlapping_iov(iov, runs):
284 Takes an overall IoV() object and a list of ExpRun
285 and returns the set of ExpRun containing only those runs that overlap
289 iov (IoV): IoV to compare overlaps
with
290 runs (list[ExpRun]): The available runs to check
if them overlap
with the IoV
295 overlapping_runs = set()
298 run_iov = run.make_iov()
299 if run_iov.overlaps(iov):
300 overlapping_runs.add(run)
301 return overlapping_runs
304def iov_from_runs(runs):
306 Takes a list of (Exp,Run) and returns the overall IoV
from the lowest ExpRun to the highest.
307 It returns an IoV() object
and assumes that the list was
in order to begin
with.
310 exprun_low, exprun_high = runs[0], runs[-1]
312 exprun_low, exprun_high = runs[0], runs[0]
313 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
316def iov_from_runvector(iov_vector):
318 Takes a vector of ExpRun from CalibrationAlgorithm
and returns
319 the overall IoV
from the lowest ExpRun to the highest. It returns
320 an IoV() object. It assumes that the vector was
in order to begin
with.
323 exprun_list = [list(ExpRun(iov.first, iov.second))
for iov
in iov_vector]
324 if len(exprun_list) > 1:
325 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
327 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
328 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
331def vector_from_runs(runs):
333 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
336 runs (list[ExpRun]): The runs to convert
339 ROOT.vector(ROOT.pair(int,int))
341 exprun_type = ROOT.pair(int, int)
342 run_vec = ROOT.vector(exprun_type)()
343 run_vec.reserve(len(runs))
345 run_vec.push_back(exprun_type(run.exp, run.run))
349def runs_from_vector(exprun_vector):
351 Takes a vector of `ExpRun` from CalibrationAlgorithm
and returns
352 a Python list of (exp,run) tuples
in the same order.
355 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns
for conversion
360 return [ExpRun(exprun.first, exprun.second)
for exprun
in exprun_vector]
363def find_run_lists_from_boundaries(boundaries, runs):
365 Takes a list of starting ExpRun boundaries and a list of available ExpRuns
and finds
366 the runs that are contained
in the IoV of each boundary interval. We assume that this
367 is occurring
in only one Experiment! We also assume that after the last boundary start
368 you want to include all runs that are higher than this starting ExpRun.
369 Note that the output ExpRuns
in their lists will be sorted. So the ordering may be
370 different than the overall input order.
373 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
374 runs (list[ExpRun]): The available runs to chunk into boundaries
377 dict[IoV,list[ExpRun]]
379 boundary_iov_to_runs = {}
381 for start_current, start_next
in pairwise(boundaries):
383 boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
384 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
385 boundary_iov_to_runs[boundary_iov] = boundary_runs
387 boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
388 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
389 boundary_iov_to_runs[boundary_iov] = boundary_runs
390 return boundary_iov_to_runs
393def find_sources(dependencies):
395 Returns a deque of node names that have no input dependencies.
399 in_degrees = OrderedDict((k, 0)
for k
in dependencies)
400 for node, adjacency_list
in dependencies.items():
401 for future_node
in adjacency_list:
402 in_degrees[future_node] += 1
406 for name, in_degree
in in_degrees.items():
408 sources.appendleft(name)
413def topological_sort(dependencies):
415 Does a topological sort of a graph (dictionary) where the keys are the
416 node names, and the values are lists of node names that depend on the
417 key (including zero dependencies). It should
return the sorted
420 >>> dependencies = {}
421 >>> dependencies[
'c'] = [
'a',
'b']
422 >>> dependencies[
'b'] = [
'a']
423 >>> dependencies[
'a'] = []
424 >>> sorted = topological_sort(dependencies)
430 in_degrees = {k: 0
for k
in dependencies}
431 for node, adjacency_list
in dependencies.items():
432 for future_node
in adjacency_list:
433 in_degrees[future_node] += 1
437 for name, in_degree
in in_degrees.items():
439 sources.appendleft(name)
443 source = sources.pop()
445 for node
in dependencies[source]:
446 in_degrees[node] -= 1
447 if in_degrees[node] == 0:
448 sources.appendleft(node)
450 if len(order) == len(dependencies):
453 B2WARNING(
"Cyclic dependency detected, check CAF.add_dependency() calls.")
457def all_dependencies(dependencies, order=None):
459 Here we pass in a dictionary of the form that
is used
in topological sort
460 where the keys are nodes,
and the values are a list of the nodes that depend
463 However, the value (list) does
not necessarily contain all of the future nodes
464 that depend on each one, only those that are directly adjacent
in the graph.
465 So there are implicit dependencies
not shown
in the list.
467 This function calculates the implicit future nodes
and returns an OrderedDict
468 with a full list
for each node. This may be expensive
in memory
for
469 complex graphs so be careful.
471 If you care about the ordering of the final OrderedDict you can
pass in a list
472 of the nodes. The final OrderedDict then has the same order
as the order parameter.
474 full_dependencies = OrderedDict()
476 def add_out_nodes(node, node_set):
478 This is a recursive function that follows the tree of adjacent future nodes
479 and adds all of them to a set (so that we have unique items)
481 for out_node
in dependencies[node]:
482 node_set.add(out_node)
483 add_out_nodes(out_node, node_set)
486 order = dependencies.keys()
490 node_dependencies = set()
491 add_out_nodes(node, node_dependencies)
492 full_dependencies[node] = list(node_dependencies)
494 return full_dependencies
497def past_from_future_dependencies(future_dependencies):
498 past_dependencies = defaultdict(list)
499 for node, deps
in future_dependencies.items():
501 past_dependencies[dep].append(node)
502 return past_dependencies
505def decode_json_string(object_string):
507 Simple function to call json.loads() on a string to return the
508 Python object constructed (Saves importing json everywhere).
510 return json.loads(object_string)
513def method_dispatch(func):
515 Decorator that behaves exactly like functools.singledispatch
516 but which takes the second argument to be the important one
517 that we want to check the type of and dispatch to the correct function.
519 This
is needed when trying to dispatch a method
in a
class, since the
520 first argument of the method
is always
'self'.
521 Just decorate around
class methods and their alternate functions:
524 >>>
def my_method(self, default_type, ...):
527 >>>
@my_method.register(list)
528 >>>
def _(self, list_type, ...):
531 Doesn
't work the same for property decorated class methods, as these
532 return a property builtin
not a function
and change the method naming.
533 Do this type of decoration to get them to work:
536 >>>
def my_property(self):
537 >>>
return self._my_property
539 >>>
@my_property.setter
541 >>>
def my_property(self, input_property):
544 >>>
@my_property.fset.register(list)
545 >>>
def _(self, input_list_properties):
548 dispatcher = singledispatch(func)
551 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
552 wrapper.register = dispatcher.register
553 update_wrapper(wrapper, func)
557@contextlib.contextmanager
558def temporary_workdir(path):
559 """Context manager that changes the working directory to the given
560 path and then changes it back to its previous value on exit.
562 prev_cwd = os.getcwd()
572 Simple wrapper for basf2 paths to allow some extra python functionality directly on
573 them e.g. comparing whether
or not a module
is contained within a path
with 'in' keyword.
576 def __init__(self, path=None):
578 Initialising with a path.
586 self._module_names = []
589 def _update_names(self):
591 Takes the self.path attribute and uses the current state to recreate the
592 self.module_names list
594 for module
in self.path.
modules():
595 self._module_names.append(module.name())
597 def __contains__(self, module_name):
599 Special method to allow 'module_name in path' type comparisons. Returns
600 a boolean
and compares by module name.
603 return module_name
in self._module_names
605 def index(self, module_name):
607 Returns the index of the first instance of a module in the contained path.
609 return self._module_names.index(module_name)
612def merge_local_databases(list_database_dirs, output_database_dir):
614 Takes a list of database directories and merges them into one new directory,
615 defined by the output_database_dir.
616 It assumes that each of the database directories
is of the standard form:
620 -> <payload file name>
621 -> <payload file name>
624 os.mkdir(output_database_dir)
625 database_file_path = os.path.join(output_database_dir, 'database.txt')
626 with open(database_file_path,
'w')
as db_file:
627 for directory
in list_database_dirs:
628 if not os.path.exists(directory):
629 B2WARNING(f
"Database directory {directory} requested by collector but it doesn't exist!")
633 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
634 file_names = [file_name
for file_name
in listdir(directory)
if isfile(join(directory, file_name))]
635 file_names.remove(
'database.txt')
637 file_names = [os.path.join(directory, file_name)
for file_name
in file_names[:]]
638 for file_name
in file_names:
639 shutil.copy(file_name, output_database_dir)
641 with open(os.path.join(directory,
'database.txt'))
as f:
642 for line
in f.readlines():
646def get_iov_from_file(file_path):
648 Returns an IoV of the exp/run contained within the given file.
649 Uses the b2file-metadata-show basf2 tool.
652 metadata_output = subprocess.check_output([
'b2file-metadata-show',
'--json', file_path])
653 m = json.loads(metadata_output.decode(
'utf-8'))
654 return IoV(m[
'experimentLow'], m[
'runLow'], m[
'experimentHigh'], m[
'runHigh'])
657def get_file_iov_tuple(file_path):
659 Simple little function to return both the input file path
and the relevant IoV, instead of just the IoV.
661 B2INFO(f"Finding IoV for {file_path}.")
662 return (file_path, get_iov_from_file(file_path))
665def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
667 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
671 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs
for.
674 polling_time (int): Time between checking
if our results are ready.
675 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
676 We don
't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
677 filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
678 patterns. If
True is returned the file will have its metadata returned. If
False it will be skipped. The filter function
679 should take the filepath string
as its only argument.
682 dict: Mapping of matching input file paths (Key) to their IoV (Value)
684 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
688 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
692 for file_path
in absolute_file_paths:
693 B2INFO(f
"Finding IoV for {file_path}.")
694 file_to_iov[file_path] = get_iov_from_file(file_path)
698 for file_path
in absolute_file_paths:
699 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
702 if all(map(
lambda result: result.ready(), results)):
704 B2INFO(
"Still waiting for IoVs to be calculated.")
705 time.sleep(polling_time)
707 for result
in results:
708 file_iov = result.get()
709 file_to_iov[file_iov[0]] = file_iov[1]
714def find_absolute_file_paths(file_path_patterns):
716 Takes a file path list (including wildcards) and performs glob.glob()
717 to extract the absolute file paths to all matching files.
719 Also uses set() to prevent multiple instances of the same file path
720 but returns a list of file paths.
722 Any non
"file" type urls are taken
as absolute file paths already
and are simply
725 existing_file_paths = set()
726 for file_pattern
in file_path_patterns:
727 file_pattern_uri = parse_file_uri(file_pattern)
728 if file_pattern_uri.scheme ==
"file":
729 input_files = glob.glob(file_pattern_uri.path)
731 B2WARNING(f
"No files matching {file_pattern} can be found, it will be skipped!")
733 for file_path
in input_files:
734 file_path = os.path.abspath(file_path)
735 if os.path.isfile(file_path):
736 existing_file_paths.add(file_path)
738 B2INFO(f
"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
739 existing_file_paths.add(file_pattern)
741 abs_file_paths = list(existing_file_paths)
742 return abs_file_paths
745def parse_raw_data_iov(file_path):
747 For as long
as the Raw data
is stored using a predictable directory/filename structure
748 we can take advantage of it to more quickly infer the IoV of the files.
751 file_path (str): The absolute file path of a Raw data file on KEKCC
754 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
757 file_path = Path(file_path)
763 reduced_path = file_path.relative_to(
"/hsm/belle2/bdata/Data/Raw")
766 reduced_path = file_path.relative_to(
"/group/belle2/dataprod/Data/Raw")
769 path_exp = int(reduced_path.parts[0][1:])
770 path_run = int(reduced_path.parts[1][1:])
772 split_filename = reduced_path.name.split(
".")
773 filename_exp = int(split_filename[1])
774 filename_run = int(split_filename[2])
775 except ValueError
as e:
776 raise ValueError(f
"Wrong file path: {file_path}.")
from e
778 if path_exp == filename_exp
and path_run == filename_run:
779 return IoV(path_exp, path_run, path_exp, path_run)
781 raise ValueError(f
"Filename and directory gave different IoV after parsing for: {file_path}.")
784def create_directories(path, overwrite=True):
786 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
787 or delete it
and re-create it fresh. It will only delete the end point,
not any intermediate directories created.
790 if (path.exists()
and overwrite):
793 if not path.exists():
797def find_int_dirs(dir_path):
799 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
800 from iterations above your current one. This function will find directories that match an integer.
803 dir_path(`pathlib.Path`): The directory to search inside.
806 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
809 all_dirs = [sub_dir for sub_dir
in dir_path.glob(
"*")
if sub_dir.is_dir()]
810 for directory
in all_dirs:
813 paths.append(directory)
819def parse_file_uri(file_uri):
821 A central function for parsing file URI strings. Just so we only have to change it
in one place later.
827 urllib.parse.ParseResult
829 return urlparse(file_uri, scheme=
"file", allow_fragments=
False)
832UNBOUND_EXPRUN = ExpRun(-1, -1)