Belle II Software development
utils.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15This module contains various utility functions for the CAF and Job submission Backends to use.
16"""
17
18from basf2 import B2INFO, B2WARNING, B2DEBUG
19import os
20import glob
21from collections import deque
22from collections import OrderedDict
23from collections import namedtuple
24from collections import defaultdict
25import pathlib
26import json
27from functools import singledispatch, update_wrapper
28import contextlib
29import enum
30import shutil
31import itertools
32from urllib.parse import urlparse
33
34import ROOT
35from ROOT.Belle2 import CalibrationAlgorithm, IntervalOfValidity
36
37
38b2info_newline = "\n" + (7 * " ")
39
40
41def B2INFO_MULTILINE(lines):
42 """
43 Parameters:
44 lines (list[str]): Lines to be printed in a single call to B2INFO
45
46 Quick little function that creates a string for B2INFO from a list of strings.
47 But it appends a newline character + the necessary indentation to the follwing line
48 so that the B2INFO output is nicely aligned.
49 Then it calls B2INFO on the output.
50 """
51 log_string = b2info_newline.join(lines)
52 B2INFO(log_string)
53
54
55def grouper(n, iterable):
56 """
57 Parameters:
58 n (int): Maximum size of the list that gets returned.
59 iterable (list): The original list that we want to return groups of size 'n' from.
60
61 Yields:
62 tuple
63 """
64 it = iter(iterable)
65 while True:
66 chunk = tuple(itertools.islice(it, n))
67 if not chunk:
68 return
69 yield chunk
70
71
72def pairwise(iterable):
73 """
74 Iterate through a sequence by pairing up the current and next entry.
75 Note that when you hit the last one you don't get a (last, null), the final iteration gives you (last-1, last) and then finishes. If you only
76 have one entry in the sequence this may be important as you will not get any
77 looping.
78
79 Parameters:
80 iterable (list): The iterable object we will loop over
81
82 Returns:
83 list[tuple]
84 """
85 a, b = itertools.tee(iterable)
86 next(b, None)
87 return zip(a, b)
88
89
90def find_gaps_in_iov_list(iov_list):
91 """
92 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
93 runs which lie between two IoVs that are separated by an experiment e.g. between
94 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
95 number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
96 and will not be added to the output list of IoVs
97
98 Parameters:
99 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
100 i.e. runs that aren't covered.
101
102 Returns:
103 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
104 """
105 gaps = []
106 previous_iov = None
107 for current_iov in iov_list:
108 if previous_iov:
109 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
110 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
111 iov_gap = previous_highest.find_gap(current_lowest)
112 if iov_gap:
113 B2DEBUG(29, f"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
114 gaps.append(iov_gap)
115 previous_iov = current_iov
116 return gaps
117
118
119class ExpRun(namedtuple('ExpRun_Factory', ['exp', 'run'])):
120 """
121 Class to define a single (Exp,Run) number i.e. not an IoV.
122 It is derived from a namedtuple created class.
123
124 We use the name 'ExpRun_Factory' in the factory creation so that
125 the MRO doesn't contain two of the same class names which is probably fine
126 but feels wrong.
127
128 KeyWord Arguments:
129 exp (int): The experiment number
130 run (int): The run number
131 """
132
133 def make_iov(self):
134 """
135 Returns:
136 IoV: A simple IoV corresponding to this single ExpRun
137 """
138 return IoV(self.exp, self.run, self.exp, self.run)
139
140 def find_gap(self, other):
141 """
142 Finds the IoV gap bewteen these two ExpRuns.
143 """
144 lower, upper = sorted((self, other))
145 if lower.exp == upper.exp and lower.run != upper.run:
146 if (upper.run - lower.run) > 1:
147 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
148 else:
149 return None
150 else:
151 return None
152
153
154class IoV(namedtuple('IoV_Factory', ['exp_low', 'run_low', 'exp_high', 'run_high'])):
155 """
156 Python class to more easily manipulate an IoV and compare against others.
157 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
158 It is derived from a namedtuple created class.
159
160 We use the name 'IoV_Factory' in the factory creation so that
161 the MRO doesn't contain two of the same class names which is probably fine
162 but feels wrong.
163
164 Default construction is an 'empty' IoV of -1,-1,-1,-1
165 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
166
167 For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
168 """
169
170 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
171 """
172 The special method to create the tuple instance. Returning the instance
173 calls the __init__ method.
174 """
175 return super().__new__(cls, exp_low, run_low, exp_high, run_high)
176
177 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
178 """
179 Called after __new__.
180 """
181 self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
182
183 def contains(self, iov):
184 """
185 Check if this IoV contains another one that is passed in.
186 """
187 return self._cpp_iov.contains(iov._cpp_iov)
188
189 def overlaps(self, iov):
190 """
191 Check if this IoV overlaps another one that is passed in.
192 """
193 return self._cpp_iov.overlaps(iov._cpp_iov)
194
195
196@enum.unique
197class AlgResult(enum.Enum):
198 """
199 Enum of Calibration results. Shouldn't be very necessary to use this
200 over the direct CalibrationAlgorithm members but it's nice to have
201 something pythonic ready to go.
202 """
203
204 ok = CalibrationAlgorithm.c_OK
205
206 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
207
208 iterate = CalibrationAlgorithm.c_Iterate
209
210 failure = CalibrationAlgorithm.c_Failure
211
212
213IoV_Result = namedtuple('IoV_Result', ['iov', 'result'])
214
215
216class LocalDatabase():
217 """
218 Simple class to hold the information about a basf2 Local database.
219 Does a bit of checking that the file path entered is valid etc.
220
221 Paramters:
222 filepath (str): The file path of the database.txt file of the localdb
223
224 Keyword Arguments:
225 payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
226 """
227 db_type = "local"
228
229 def __init__(self, filepath, payload_dir=''):
230 f = pathlib.Path(filepath)
231 if f.exists():
232 self.filepath = f.resolve()
233 if not payload_dir:
234 self.payload_dir = pathlib.Path(self.filepath.parent)
235 else:
236 p = pathlib.Path(payload_dir)
237 if p.exists():
238 self.payload_dir = p.resolve()
239 else:
240 raise ValueError(f"The LocalDatabase payload_dir: {p} does not exist.")
241 else:
242 raise ValueError(f"The LocalDatabase filepath: {f} does not exist.")
243
244
245class CentralDatabase():
246 """
247 Simple class to hold the information about a bas2 Central database.
248 Does no checking that a global tag exists.
249 This class could be made much simpler, but it's made to be similar to LocalDatabase.
250 Parameters:
251 global_tag (str): The Global Tag of the central database
252 """
253 db_type = "central"
254
255 def __init__(self, global_tag):
256 self.global_tag = global_tag
257
258
259def split_runs_by_exp(runs):
260 """
261 Parameters:
262 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
263
264 Returns:
265 list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
266 """
267 split_by_runs = []
268 current_exp = runs[0].exp
269 exp_list = []
270 for exprun in runs:
271 if exprun.exp != current_exp:
272 split_by_runs.append(exp_list)
273 exp_list = [exprun]
274 else:
275 exp_list.append(exprun)
276 current_exp = exprun.exp
277 else:
278 split_by_runs.append(exp_list)
279 return split_by_runs
280
281
282def runs_overlapping_iov(iov, runs):
283 """
284 Takes an overall IoV() object and a list of ExpRun
285 and returns the set of ExpRun containing only those runs that overlap
286 with the IoV.
287
288 Parameters:
289 iov (IoV): IoV to compare overlaps with
290 runs (list[ExpRun]): The available runs to check if them overlap with the IoV
291
292 Return:
293 set
294 """
295 overlapping_runs = set()
296 for run in runs:
297 # Construct an IOV of one run
298 run_iov = run.make_iov()
299 if run_iov.overlaps(iov):
300 overlapping_runs.add(run)
301 return overlapping_runs
302
303
304def iov_from_runs(runs):
305 """
306 Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
307 It returns an IoV() object and assumes that the list was in order to begin with.
308 """
309 if len(runs) > 1:
310 exprun_low, exprun_high = runs[0], runs[-1]
311 else:
312 exprun_low, exprun_high = runs[0], runs[0]
313 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
314
315
316def iov_from_runvector(iov_vector):
317 """
318 Takes a vector of ExpRun from CalibrationAlgorithm and returns
319 the overall IoV from the lowest ExpRun to the highest. It returns
320 an IoV() object. It assumes that the vector was in order to begin with.
321 """
322 import copy
323 exprun_list = [list(ExpRun(iov.first, iov.second)) for iov in iov_vector]
324 if len(exprun_list) > 1:
325 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
326 else:
327 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
328 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
329
330
331def vector_from_runs(runs):
332 """
333 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
334
335 Parameters:
336 runs (list[ExpRun]): The runs to convert
337
338 Returns:
339 ROOT.vector(ROOT.pair(int,int))
340 """
341 exprun_type = ROOT.pair(int, int)
342 run_vec = ROOT.vector(exprun_type)()
343 run_vec.reserve(len(runs))
344 for run in runs:
345 run_vec.push_back(exprun_type(run.exp, run.run))
346 return run_vec
347
348
349def runs_from_vector(exprun_vector):
350 """
351 Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
352 a Python list of (exp,run) tuples in the same order.
353
354 Parameters:
355 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
356
357 Return:
358 list[ExpRun]
359 """
360 return [ExpRun(exprun.first, exprun.second) for exprun in exprun_vector]
361
362
363def find_run_lists_from_boundaries(boundaries, runs):
364 """
365 Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
366 the runs that are contained in the IoV of each boundary interval. We assume that this
367 is occuring in only one Experiment! We also assume that after the last boundary start
368 you want to include all runs that are higher than this starting ExpRun.
369 Note that the output ExpRuns in their lists will be sorted. So the ordering may be
370 different than the overall input order.
371
372 Parameters:
373 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
374 runs (list[ExpRun]): The available runs to chunk into boundaries
375
376 Return:
377 dict[IoV,list[ExpRun]]
378 """
379 boundary_iov_to_runs = {}
380 # Find the boundary IoVs
381 for start_current, start_next in pairwise(boundaries):
382 # We can safely assume the run-1 because we aren't doing this across multiple experiment numbers
383 boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
384 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
385 boundary_iov_to_runs[boundary_iov] = boundary_runs
386 # The final boundary start won't get iterated above because there's no 'next' boundary. So we add the remaining runs here
387 boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
388 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
389 boundary_iov_to_runs[boundary_iov] = boundary_runs
390 return boundary_iov_to_runs
391
392
393def find_sources(dependencies):
394 """
395 Returns a deque of node names that have no input dependencies.
396 """
397 # Create an OrderedDict to make sure that our sources are
398 # in the same order that we started with
399 in_degrees = OrderedDict((k, 0) for k in dependencies)
400 for node, adjacency_list in dependencies.items():
401 for future_node in adjacency_list:
402 in_degrees[future_node] += 1
403
404 # We build a deque of nodes with no dependencies
405 sources = deque([])
406 for name, in_degree in in_degrees.items():
407 if in_degree == 0:
408 sources.appendleft(name)
409
410 return sources
411
412
413def topological_sort(dependencies):
414 """
415 Does a topological sort of a graph (dictionary) where the keys are the
416 node names, and the values are lists of node names that depend on the
417 key (including zero dependencies). It should return the sorted
418 list of nodes.
419
420 >>> dependencies = {}
421 >>> dependencies['c'] = ['a','b']
422 >>> dependencies['b'] = ['a']
423 >>> dependencies['a'] = []
424 >>> sorted = topological_sort(dependencies)
425 >>> print(sorted)
426 ['c', 'b', 'a']
427 """
428 # We find the in-degree (number of dependencies) for each node
429 # and store it.
430 in_degrees = {k: 0 for k in dependencies}
431 for node, adjacency_list in dependencies.items():
432 for future_node in adjacency_list:
433 in_degrees[future_node] += 1
434
435 # We build a deque of nodes with no dependencies
436 sources = deque([])
437 for name, in_degree in in_degrees.items():
438 if in_degree == 0:
439 sources.appendleft(name)
440
441 order = []
442 while sources: # Keep adding and removing from this until solved
443 source = sources.pop() # Pick a node with no dependencies
444 order.append(source) # Add it to our ordered nodes
445 for node in dependencies[source]: # Remove vertices from adjacent nodes
446 in_degrees[node] -= 1
447 if in_degrees[node] == 0: # If we've created a new source, add it.
448 sources.appendleft(node)
449
450 if len(order) == len(dependencies): # Check if all nodes were ordered
451 return order # If not, then there was a cyclic dependence
452 else:
453 B2WARNING("Cyclic dependency detected, check CAF.add_dependency() calls.")
454 return []
455
456
457def all_dependencies(dependencies, order=None):
458 """
459 Here we pass in a dictionary of the form that is used in topological sort
460 where the keys are nodes, and the values are a list of the nodes that depend
461 on it.
462
463 However, the value (list) does not necessarily contain all of the future nodes
464 that depend on each one, only those that are directly adjacent in the graph.
465 So there are implicit dependencies not shown in the list.
466
467 This function calculates the implicit future nodes and returns an OrderedDict
468 with a full list for each node. This may be expensive in memory for
469 complex graphs so be careful.
470
471 If you care about the ordering of the final OrderedDict you can pass in a list
472 of the nodes. The final OrderedDict then has the same order as the order parameter.
473 """
474 full_dependencies = OrderedDict()
475
476 def add_out_nodes(node, node_set):
477 """
478 This is a recursive function that follows the tree of adjacent future nodes
479 and adds all of them to a set (so that we have unique items)
480 """
481 for out_node in dependencies[node]:
482 node_set.add(out_node)
483 add_out_nodes(out_node, node_set)
484
485 if not order:
486 order = dependencies.keys()
487 # Loop over the nodes in the order and recursively head upwards through explicit
488 # adjacent nodes.
489 for node in order:
490 node_dependencies = set()
491 add_out_nodes(node, node_dependencies)
492 full_dependencies[node] = list(node_dependencies)
493
494 return full_dependencies
495
496
497def past_from_future_dependencies(future_dependencies):
498 past_dependencies = defaultdict(list)
499 for node, deps in future_dependencies.items():
500 for dep in deps:
501 past_dependencies[dep].append(node)
502 return past_dependencies
503
504
505def decode_json_string(object_string):
506 """
507 Simple function to call json.loads() on a string to return the
508 Python object constructed (Saves importing json everywhere).
509 """
510 return json.loads(object_string)
511
512
513def method_dispatch(func):
514 """
515 Decorator that behaves exactly like functools.singledispatch
516 but which takes the second argument to be the important one
517 that we want to check the type of and dispatch to the correct function.
518
519 This is needed when trying to dispatch a method in a class, since the
520 first argument of the method is always 'self'.
521 Just decorate around class methods and their alternate functions:
522
523 >>> @method_dispatch # Default method
524 >>> def my_method(self, default_type, ...):
525 >>> pass
526
527 >>> @my_method.register(list) # Registers list method for dispatch
528 >>> def _(self, list_type, ...):
529 >>> pass
530
531 Doesn't work the same for property decorated class methods, as these
532 return a property builtin not a function and change the method naming.
533 Do this type of decoration to get them to work:
534
535 >>> @property
536 >>> def my_property(self):
537 >>> return self._my_property
538
539 >>> @my_property.setter
540 >>> @method_dispatch
541 >>> def my_property(self, input_property):
542 >>> pass
543
544 >>> @my_property.fset.register(list)
545 >>> def _(self, input_list_properties):
546 >>> pass
547 """
548 dispatcher = singledispatch(func)
549
550 def wrapper(*args, **kw):
551 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
552 wrapper.register = dispatcher.register
553 update_wrapper(wrapper, func)
554 return wrapper
555
556
557@contextlib.contextmanager
558def temporary_workdir(path):
559 """Context manager that changes the working directory to the given
560 path and then changes it back to its previous value on exit.
561 """
562 prev_cwd = os.getcwd()
563 os.chdir(path)
564 try:
565 yield
566 finally:
567 os.chdir(prev_cwd)
568
569
570class PathExtras():
571 """
572 Simple wrapper for basf2 paths to allow some extra python functionality directly on
573 them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
574 """
575
576 def __init__(self, path=None):
577 """
578 Initialising with a path.
579 """
580 if path:
581
582 self.path = path
583 else:
584 path = []
585
586 self._module_names = []
587 self._update_names()
588
589 def _update_names(self):
590 """
591 Takes the self.path attribute and uses the current state to recreate the
592 self.module_names list
593 """
594 for module in self.path.modules():
595 self._module_names.append(module.name())
596
597 def __contains__(self, module_name):
598 """
599 Special method to allow 'module_name in path' type comparisons. Returns
600 a boolean and compares by module name.
601 """
602 self._update_names()
603 return module_name in self._module_names
604
605 def index(self, module_name):
606 """
607 Returns the index of the first instance of a module in the contained path.
608 """
609 return self._module_names.index(module_name)
610
611
612def merge_local_databases(list_database_dirs, output_database_dir):
613 """
614 Takes a list of database directories and merges them into one new directory,
615 defined by the output_database_dir.
616 It assumes that each of the database directories is of the standard form:
617
618 directory_name
619 -> database.txt
620 -> <payload file name>
621 -> <payload file name>
622 -> ...
623 """
624 os.mkdir(output_database_dir)
625 database_file_path = os.path.join(output_database_dir, 'database.txt')
626 with open(database_file_path, 'w') as db_file:
627 for directory in list_database_dirs:
628 if not os.path.exists(directory):
629 B2WARNING(f"Database directory {directory} requested by collector but it doesn't exist!")
630 continue
631 else:
632 # Get only the files, not directories
633 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
634 file_names = [file_name for file_name in listdir(directory) if isfile(join(directory, file_name))]
635 file_names.remove('database.txt')
636 # Now we need the absolute paths to all of the payload files so we can copy them across
637 file_names = [os.path.join(directory, file_name) for file_name in file_names[:]]
638 for file_name in file_names:
639 shutil.copy(file_name, output_database_dir)
640 # Now grab all the IoV stuff from each database.txt files and merge it.
641 with open(os.path.join(directory, 'database.txt')) as f:
642 for line in f.readlines():
643 db_file.write(line)
644
645
646def get_iov_from_file(file_path):
647 """
648 Returns an IoV of the exp/run contained within the given file.
649 Uses the b2file-metadata-show basf2 tool.
650 """
651 import subprocess
652 metadata_output = subprocess.check_output(['b2file-metadata-show', '--json', file_path])
653 m = json.loads(metadata_output.decode('utf-8'))
654 return IoV(m['experimentLow'], m['runLow'], m['experimentHigh'], m['runHigh'])
655
656
657def get_file_iov_tuple(file_path):
658 """
659 Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
660 """
661 B2INFO(f"Finding IoV for {file_path}.")
662 return (file_path, get_iov_from_file(file_path))
663
664
665def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
666 """
667 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
668 extract the IoV.
669
670 Paramters:
671 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
672
673 Keyword Arguments:
674 polling_time (int): Time between checking if our results are ready.
675 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
676 We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
677 filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
678 patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
679 should take the filepath string as its only argument.
680
681 Returns:
682 dict: Mapping of matching input file paths (Key) to their IoV (Value)
683 """
684 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
685 # Optionally filter out files matching our filter function
686 if filterfalse:
687 import itertools
688 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
689
690 file_to_iov = {}
691 if not pool:
692 for file_path in absolute_file_paths:
693 B2INFO(f"Finding IoV for {file_path}.")
694 file_to_iov[file_path] = get_iov_from_file(file_path)
695 else:
696 import time
697 results = []
698 for file_path in absolute_file_paths:
699 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
700
701 while True:
702 if all(map(lambda result: result.ready(), results)):
703 break
704 B2INFO("Still waiting for IoVs to be calculated.")
705 time.sleep(polling_time)
706
707 for result in results:
708 file_iov = result.get()
709 file_to_iov[file_iov[0]] = file_iov[1]
710
711 return file_to_iov
712
713
714def find_absolute_file_paths(file_path_patterns):
715 """
716 Takes a file path list (including wildcards) and performs glob.glob()
717 to extract the absolute file paths to all matching files.
718
719 Also uses set() to prevent multiple instances of the same file path
720 but returns a list of file paths.
721
722 Any non "file" type urls are taken as absolute file paths already and are simply
723 passed through.
724 """
725 existing_file_paths = set()
726 for file_pattern in file_path_patterns:
727 file_pattern_uri = parse_file_uri(file_pattern)
728 if file_pattern_uri.scheme == "file":
729 input_files = glob.glob(file_pattern_uri.path)
730 if not input_files:
731 B2WARNING(f"No files matching {file_pattern} can be found, it will be skipped!")
732 else:
733 for file_path in input_files:
734 file_path = os.path.abspath(file_path)
735 if os.path.isfile(file_path):
736 existing_file_paths.add(file_path)
737 else:
738 B2INFO(f"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
739 existing_file_paths.add(file_pattern)
740
741 abs_file_paths = list(existing_file_paths)
742 return abs_file_paths
743
744
745def parse_raw_data_iov(file_path):
746 """
747 For as long as the Raw data is stored using a predictable directory/filename structure
748 we can take advantage of it to more quickly infer the IoV of the files.
749
750 Parameters:
751 file_path (str): The absolute file path of a Raw data file on KEKCC
752
753 Returns:
754 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
755 """
756 Path = pathlib.Path
757 file_path = Path(file_path)
758
759 # We'll try and extract the exp and run from both the directory and filename
760 # That wil let us check that everything is as we expect
761
762 try:
763 reduced_path = file_path.relative_to("/hsm/belle2/bdata/Data/Raw")
764 # Second try for the calibration data path
765 except ValueError:
766 reduced_path = file_path.relative_to("/group/belle2/dataprod/Data/Raw")
767
768 try:
769 path_exp = int(reduced_path.parts[0][1:])
770 path_run = int(reduced_path.parts[1][1:])
771
772 split_filename = reduced_path.name.split(".")
773 filename_exp = int(split_filename[1])
774 filename_run = int(split_filename[2])
775 except ValueError as e:
776 raise ValueError(f"Wrong file path: {file_path}.") from e
777
778 if path_exp == filename_exp and path_run == filename_run:
779 return IoV(path_exp, path_run, path_exp, path_run)
780 else:
781 raise ValueError(f"Filename and directory gave different IoV after parsing for: {file_path}.")
782
783
784def create_directories(path, overwrite=True):
785 """
786 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
787 or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
788 """
789 # Delete if overwriting and it exists
790 if (path.exists() and overwrite):
791 shutil.rmtree(path)
792 # If it never existed or we just deleted it, make it now
793 if not path.exists():
794 os.makedirs(path)
795
796
797def find_int_dirs(dir_path):
798 """
799 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
800 from iterations above your current one. This function will find directories that match an integer.
801
802 Parameters:
803 dir_path(`pathlib.Path`): The dircetory to search inside.
804
805 Returns:
806 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
807 """
808 paths = []
809 all_dirs = [sub_dir for sub_dir in dir_path.glob("*") if sub_dir.is_dir()]
810 for directory in all_dirs:
811 try:
812 int(directory.name)
813 paths.append(directory)
814 except ValueError:
815 pass
816 return paths
817
818
819def parse_file_uri(file_uri):
820 """
821 A central function for parsing file URI strings. Just so we only have to change it in one place later.
822
823 Parameters:
824 file_uri (str)
825
826 Returns:
827 urllib.parse.ParseResult
828 """
829 return urlparse(file_uri, scheme="file", allow_fragments=False)
830
831
832UNBOUND_EXPRUN = ExpRun(-1, -1)
833
834# @endcond
835
Definition: __init__.py:1