Belle II Software development
utils.py
1#!/usr/bin/env python3
2
3# disable doxygen check for this file
4# @cond
5
6
13
14"""
15This module contains various utility functions for the CAF and Job submission Backends to use.
16"""
17
18from basf2 import B2INFO, B2WARNING, B2DEBUG
19import os
20import glob
21from collections import deque
22from collections import OrderedDict
23from collections import namedtuple
24from collections import defaultdict
25import pathlib
26import json
27from functools import singledispatch, update_wrapper
28import contextlib
29import enum
30import shutil
31import itertools
32from urllib.parse import urlparse
33
34import ROOT
35from ROOT.Belle2 import CalibrationAlgorithm, IntervalOfValidity
36
37
38b2info_newline = "\n" + (7 * " ")
39
40
41def B2INFO_MULTILINE(lines):
42 """
43 Parameters:
44 lines (list[str]): Lines to be printed in a single call to B2INFO
45
46 Quick little function that creates a string for B2INFO from a list of strings.
47 But it appends a newline character + the necessary indentation to the following line
48 so that the B2INFO output is nicely aligned.
49 Then it calls B2INFO on the output.
50 """
51 log_string = b2info_newline.join(lines)
52 B2INFO(log_string)
53
54
55def grouper(n, iterable):
56 """
57 Parameters:
58 n (int): Maximum size of the list that gets returned.
59 iterable (list): The original list that we want to return groups of size 'n' from.
60
61 Yields:
62 tuple
63 """
64 it = iter(iterable)
65 while True:
66 chunk = tuple(itertools.islice(it, n))
67 if not chunk:
68 return
69 yield chunk
70
71
72def pairwise(iterable):
73 """
74 Iterate through a sequence by pairing up the current and next entry.
75 Note that when you hit the last one you don't get a (last, null), the
76 final iteration gives you (last-1, last) and then finishes. If you only
77 have one entry in the sequence this may be important as you will not get any
78 looping.
79
80 Parameters:
81 iterable (list): The iterable object we will loop over
82
83 Returns:
84 list[tuple]
85 """
86 a, b = itertools.tee(iterable)
87 next(b, None)
88 return zip(a, b)
89
90
91def find_gaps_in_iov_list(iov_list):
92 """
93 Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
94 runs which lie between two IoVs that are separated by an experiment e.g. between
95 IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
96 number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
97 and will not be added to the output list of IoVs
98
99 Parameters:
100 iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
101 i.e. runs that aren't covered.
102
103 Returns:
104 list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
105 """
106 gaps = []
107 previous_iov = None
108 for current_iov in iov_list:
109 if previous_iov:
110 previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
111 current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
112 iov_gap = previous_highest.find_gap(current_lowest)
113 if iov_gap:
114 B2DEBUG(29, f"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
115 gaps.append(iov_gap)
116 previous_iov = current_iov
117 return gaps
118
119
120class ExpRun(namedtuple('ExpRun_Factory', ['exp', 'run'])):
121 """
122 Class to define a single (Exp,Run) number i.e. not an IoV.
123 It is derived from a namedtuple created class.
124
125 We use the name 'ExpRun_Factory' in the factory creation so that
126 the MRO doesn't contain two of the same class names which is probably fine
127 but feels wrong.
128
129 KeyWord Arguments:
130 exp (int): The experiment number
131 run (int): The run number
132 """
133
134 def make_iov(self):
135 """
136 Returns:
137 IoV: A simple IoV corresponding to this single ExpRun
138 """
139 return IoV(self.exp, self.run, self.exp, self.run)
140
141 def find_gap(self, other):
142 """
143 Finds the IoV gap between these two ExpRuns.
144 """
145 lower, upper = sorted((self, other))
146 if lower.exp == upper.exp and lower.run != upper.run:
147 if (upper.run - lower.run) > 1:
148 return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
149 else:
150 return None
151 else:
152 return None
153
154
155class IoV(namedtuple('IoV_Factory', ['exp_low', 'run_low', 'exp_high', 'run_high'])):
156 """
157 Python class to more easily manipulate an IoV and compare against others.
158 Uses the C++ framework IntervalOfValidity internally to do various comparisons.
159 It is derived from a namedtuple created class.
160
161 We use the name 'IoV_Factory' in the factory creation so that
162 the MRO doesn't contain two of the same class names which is probably fine
163 but feels wrong.
164
165 Default construction is an 'empty' IoV of -1,-1,-1,-1
166 e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
167
168 For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
169 """
170
171 def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
172 """
173 The special method to create the tuple instance. Returning the instance
174 calls the __init__ method.
175 """
176 return super().__new__(cls, exp_low, run_low, exp_high, run_high)
177
178 def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
179 """
180 Called after __new__.
181 """
182 self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
183
184 def contains(self, iov):
185 """
186 Check if this IoV contains another one that is passed in.
187 """
188 return self._cpp_iov.contains(iov._cpp_iov)
189
190 def overlaps(self, iov):
191 """
192 Check if this IoV overlaps another one that is passed in.
193 """
194 return self._cpp_iov.overlaps(iov._cpp_iov)
195
196
197@enum.unique
198class AlgResult(enum.Enum):
199 """
200 Enum of Calibration results. Shouldn't be very necessary to use this
201 over the direct CalibrationAlgorithm members but it's nice to have
202 something pythonic ready to go.
203 """
204
205 ok = CalibrationAlgorithm.c_OK
206
207 not_enough_data = CalibrationAlgorithm.c_NotEnoughData
208
209 iterate = CalibrationAlgorithm.c_Iterate
210
211 failure = CalibrationAlgorithm.c_Failure
212
213
214IoV_Result = namedtuple('IoV_Result', ['iov', 'result'])
215
216
217class LocalDatabase():
218 """
219 Simple class to hold the information about a basf2 Local database.
220 Does a bit of checking that the file path entered is valid etc.
221
222 Parameters:
223 filepath (str): The file path of the database.txt file of the localdb
224
225 Keyword Arguments:
226 payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
227 """
228 db_type = "local"
229
230 def __init__(self, filepath, payload_dir=''):
231 f = pathlib.Path(filepath)
232 if f.exists():
233 self.filepath = f.resolve()
234 if not payload_dir:
235 self.payload_dir = pathlib.Path(self.filepath.parent)
236 else:
237 p = pathlib.Path(payload_dir)
238 if p.exists():
239 self.payload_dir = p.resolve()
240 else:
241 raise ValueError(f"The LocalDatabase payload_dir: {p} does not exist.")
242 else:
243 raise ValueError(f"The LocalDatabase filepath: {f} does not exist.")
244
245
246class CentralDatabase():
247 """
248 Simple class to hold the information about a bas2 Central database.
249 Does no checking that a global tag exists.
250 This class could be made much simpler, but it's made to be similar to LocalDatabase.
251
252 Parameters:
253 global_tag (str): The Global Tag of the central database
254 """
255 db_type = "central"
256
257 def __init__(self, global_tag):
258 self.global_tag = global_tag
259
260
261def split_runs_by_exp(runs):
262 """
263 Parameters:
264 runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
265
266 Returns:
267 list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
268 """
269 split_by_runs = []
270 current_exp = runs[0].exp
271 exp_list = []
272 for exprun in runs:
273 if exprun.exp != current_exp:
274 split_by_runs.append(exp_list)
275 exp_list = [exprun]
276 else:
277 exp_list.append(exprun)
278 current_exp = exprun.exp
279 else:
280 split_by_runs.append(exp_list)
281 return split_by_runs
282
283
284def runs_overlapping_iov(iov, runs):
285 """
286 Takes an overall IoV() object and a list of ExpRun
287 and returns the set of ExpRun containing only those runs that overlap
288 with the IoV.
289
290 Parameters:
291 iov (IoV): IoV to compare overlaps with
292 runs (list[ExpRun]): The available runs to check if them overlap with the IoV
293
294 Return:
295 set
296 """
297 overlapping_runs = set()
298 for run in runs:
299 # Construct an IOV of one run
300 run_iov = run.make_iov()
301 if run_iov.overlaps(iov):
302 overlapping_runs.add(run)
303 return overlapping_runs
304
305
306def iov_from_runs(runs):
307 """
308 Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
309 It returns an IoV() object and assumes that the list was in order to begin with.
310 """
311 if len(runs) > 1:
312 exprun_low, exprun_high = runs[0], runs[-1]
313 else:
314 exprun_low, exprun_high = runs[0], runs[0]
315 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
316
317
318def iov_from_runvector(iov_vector):
319 """
320 Takes a vector of ExpRun from CalibrationAlgorithm and returns
321 the overall IoV from the lowest ExpRun to the highest. It returns
322 an IoV() object. It assumes that the vector was in order to begin with.
323 """
324 import copy
325 exprun_list = [list(ExpRun(iov.first, iov.second)) for iov in iov_vector]
326 if len(exprun_list) > 1:
327 exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
328 else:
329 exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
330 return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
331
332
333def vector_from_runs(runs):
334 """
335 Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
336
337 Parameters:
338 runs (list[ExpRun]): The runs to convert
339
340 Returns:
341 ROOT.vector(ROOT.pair(int,int))
342 """
343 exprun_type = ROOT.pair(int, int)
344 run_vec = ROOT.vector(exprun_type)()
345 run_vec.reserve(len(runs))
346 for run in runs:
347 run_vec.push_back(exprun_type(run.exp, run.run))
348 return run_vec
349
350
351def runs_from_vector(exprun_vector):
352 """
353 Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
354 a Python list of (exp,run) tuples in the same order.
355
356 Parameters:
357 exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
358
359 Return:
360 list[ExpRun]
361 """
362 return [ExpRun(exprun.first, exprun.second) for exprun in exprun_vector]
363
364
365def find_run_lists_from_boundaries(boundaries, runs):
366 """
367 Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
368 the runs that are contained in the IoV of each boundary interval. We assume that this
369 is occurring in only one Experiment! We also assume that after the last boundary start
370 you want to include all runs that are higher than this starting ExpRun.
371 Note that the output ExpRuns in their lists will be sorted. So the ordering may be
372 different than the overall input order.
373
374 Parameters:
375 boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
376 runs (list[ExpRun]): The available runs to chunk into boundaries
377
378 Return:
379 dict[IoV,list[ExpRun]]
380 """
381 boundary_iov_to_runs = {}
382 # Find the boundary IoVs
383 for start_current, start_next in pairwise(boundaries):
384 # We can safely assume the run-1 because we aren't doing this across multiple experiment numbers
385 boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
386 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
387 boundary_iov_to_runs[boundary_iov] = boundary_runs
388 # The final boundary start won't get iterated above because there's no 'next' boundary. So we add the remaining runs here
389 boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
390 boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
391 boundary_iov_to_runs[boundary_iov] = boundary_runs
392 return boundary_iov_to_runs
393
394
395def find_sources(dependencies):
396 """
397 Returns a deque of node names that have no input dependencies.
398 """
399 # Create an OrderedDict to make sure that our sources are
400 # in the same order that we started with
401 in_degrees = OrderedDict((k, 0) for k in dependencies)
402 for node, adjacency_list in dependencies.items():
403 for future_node in adjacency_list:
404 in_degrees[future_node] += 1
405
406 # We build a deque of nodes with no dependencies
407 sources = deque([])
408 for name, in_degree in in_degrees.items():
409 if in_degree == 0:
410 sources.appendleft(name)
411
412 return sources
413
414
415def topological_sort(dependencies):
416 """
417 Does a topological sort of a graph (dictionary) where the keys are the
418 node names, and the values are lists of node names that depend on the
419 key (including zero dependencies). It should return the sorted
420 list of nodes.
421
422 >>> dependencies = {}
423 >>> dependencies['c'] = ['a','b']
424 >>> dependencies['b'] = ['a']
425 >>> dependencies['a'] = []
426 >>> sorted = topological_sort(dependencies)
427 >>> print(sorted)
428 ['c', 'b', 'a']
429 """
430 # We find the in-degree (number of dependencies) for each node
431 # and store it.
432 in_degrees = {k: 0 for k in dependencies}
433 for node, adjacency_list in dependencies.items():
434 for future_node in adjacency_list:
435 in_degrees[future_node] += 1
436
437 # We build a deque of nodes with no dependencies
438 sources = deque([])
439 for name, in_degree in in_degrees.items():
440 if in_degree == 0:
441 sources.appendleft(name)
442
443 order = []
444 while sources: # Keep adding and removing from this until solved
445 source = sources.pop() # Pick a node with no dependencies
446 order.append(source) # Add it to our ordered nodes
447 for node in dependencies[source]: # Remove vertices from adjacent nodes
448 in_degrees[node] -= 1
449 if in_degrees[node] == 0: # If we've created a new source, add it.
450 sources.appendleft(node)
451
452 if len(order) == len(dependencies): # Check if all nodes were ordered
453 return order # If not, then there was a cyclic dependence
454 else:
455 B2WARNING("Cyclic dependency detected, check CAF.add_dependency() calls.")
456 return []
457
458
459def all_dependencies(dependencies, order=None):
460 """
461 Here we pass in a dictionary of the form that is used in topological sort
462 where the keys are nodes, and the values are a list of the nodes that depend
463 on it.
464
465 However, the value (list) does not necessarily contain all of the future nodes
466 that depend on each one, only those that are directly adjacent in the graph.
467 So there are implicit dependencies not shown in the list.
468
469 This function calculates the implicit future nodes and returns an OrderedDict
470 with a full list for each node. This may be expensive in memory for
471 complex graphs so be careful.
472
473 If you care about the ordering of the final OrderedDict you can pass in a list
474 of the nodes. The final OrderedDict then has the same order as the order parameter.
475 """
476 full_dependencies = OrderedDict()
477
478 def add_out_nodes(node, node_set):
479 """
480 This is a recursive function that follows the tree of adjacent future nodes
481 and adds all of them to a set (so that we have unique items)
482 """
483 for out_node in dependencies[node]:
484 node_set.add(out_node)
485 add_out_nodes(out_node, node_set)
486
487 if not order:
488 order = dependencies.keys()
489 # Loop over the nodes in the order and recursively head upwards through explicit
490 # adjacent nodes.
491 for node in order:
492 node_dependencies = set()
493 add_out_nodes(node, node_dependencies)
494 full_dependencies[node] = list(node_dependencies)
495
496 return full_dependencies
497
498
499def past_from_future_dependencies(future_dependencies):
500 past_dependencies = defaultdict(list)
501 for node, deps in future_dependencies.items():
502 for dep in deps:
503 past_dependencies[dep].append(node)
504 return past_dependencies
505
506
507def decode_json_string(object_string):
508 """
509 Simple function to call json.loads() on a string to return the
510 Python object constructed (Saves importing json everywhere).
511 """
512 return json.loads(object_string)
513
514
515def method_dispatch(func):
516 """
517 Decorator that behaves exactly like functools.singledispatch
518 but which takes the second argument to be the important one
519 that we want to check the type of and dispatch to the correct function.
520
521 This is needed when trying to dispatch a method in a class, since the
522 first argument of the method is always 'self'.
523 Just decorate around class methods and their alternate functions:
524
525 >>> @method_dispatch # Default method
526 >>> def my_method(self, default_type, ...):
527 >>> pass
528
529 >>> @my_method.register(list) # Registers list method for dispatch
530 >>> def _(self, list_type, ...):
531 >>> pass
532
533 Doesn't work the same for property decorated class methods, as these
534 return a property builtin not a function and change the method naming.
535 Do this type of decoration to get them to work:
536
537 >>> @property
538 >>> def my_property(self):
539 >>> return self._my_property
540
541 >>> @my_property.setter
542 >>> @method_dispatch
543 >>> def my_property(self, input_property):
544 >>> pass
545
546 >>> @my_property.fset.register(list)
547 >>> def _(self, input_list_properties):
548 >>> pass
549 """
550 dispatcher = singledispatch(func)
551
552 def wrapper(*args, **kw):
553 return dispatcher.dispatch(args[1].__class__)(*args, **kw)
554 wrapper.register = dispatcher.register
555 update_wrapper(wrapper, func)
556 return wrapper
557
558
559@contextlib.contextmanager
560def temporary_workdir(path):
561 """Context manager that changes the working directory to the given
562 path and then changes it back to its previous value on exit.
563 """
564 prev_cwd = os.getcwd()
565 os.chdir(path)
566 try:
567 yield
568 finally:
569 os.chdir(prev_cwd)
570
571
572class PathExtras():
573 """
574 Simple wrapper for basf2 paths to allow some extra python functionality directly on
575 them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
576 """
577
578 def __init__(self, path=None):
579 """
580 Initialising with a path.
581 """
582 if path:
583
584 self.path = path
585 else:
586 path = []
587
588 self._module_names = []
589 self._update_names()
590
591 def _update_names(self):
592 """
593 Takes the self.path attribute and uses the current state to recreate the
594 self.module_names list
595 """
596 for module in self.path.modules():
597 self._module_names.append(module.name())
598
599 def __contains__(self, module_name):
600 """
601 Special method to allow 'module_name in path' type comparisons. Returns
602 a boolean and compares by module name.
603 """
604 self._update_names()
605 return module_name in self._module_names
606
607 def index(self, module_name):
608 """
609 Returns the index of the first instance of a module in the contained path.
610 """
611 return self._module_names.index(module_name)
612
613
614def merge_local_databases(list_database_dirs, output_database_dir):
615 """
616 Takes a list of database directories and merges them into one new directory,
617 defined by the output_database_dir.
618 It assumes that each of the database directories is of the standard form:
619
620 directory_name
621 -> database.txt
622 -> <payload file name>
623 -> <payload file name>
624 -> ...
625 """
626 os.mkdir(output_database_dir)
627 database_file_path = os.path.join(output_database_dir, 'database.txt')
628 with open(database_file_path, 'w') as db_file:
629 for directory in list_database_dirs:
630 if not os.path.exists(directory):
631 B2WARNING(f"Database directory {directory} requested by collector but it doesn't exist!")
632 continue
633 else:
634 # Get only the files, not directories
635 listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
636 file_names = [file_name for file_name in listdir(directory) if isfile(join(directory, file_name))]
637 file_names.remove('database.txt')
638 # Now we need the absolute paths to all of the payload files so we can copy them across
639 file_names = [os.path.join(directory, file_name) for file_name in file_names[:]]
640 for file_name in file_names:
641 shutil.copy(file_name, output_database_dir)
642 # Now grab all the IoV stuff from each database.txt files and merge it.
643 with open(os.path.join(directory, 'database.txt')) as f:
644 for line in f.readlines():
645 db_file.write(line)
646
647
648def get_iov_from_file(file_path):
649 """
650 Returns an IoV of the exp/run contained within the given file.
651 Uses the b2file-metadata-show basf2 tool.
652 """
653 import subprocess
654 metadata_output = subprocess.check_output(['b2file-metadata-show', '--json', file_path])
655 m = json.loads(metadata_output.decode('utf-8'))
656 return IoV(m['experimentLow'], m['runLow'], m['experimentHigh'], m['runHigh'])
657
658
659def get_file_iov_tuple(file_path):
660 """
661 Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
662 """
663 B2INFO(f"Finding IoV for {file_path}.")
664 return (file_path, get_iov_from_file(file_path))
665
666
667def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
668 """
669 Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
670 extract the IoV.
671
672 Parameters:
673 file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
674
675 Keyword Arguments:
676 polling_time (int): Time between checking if our results are ready.
677 pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
678 We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
679
680 filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
681 patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
682 should take the filepath string as its only argument.
683
684 Returns:
685 dict: Mapping of matching input file paths (Key) to their IoV (Value)
686 """
687 absolute_file_paths = find_absolute_file_paths(file_path_patterns)
688 # Optionally filter out files matching our filter function
689 if filterfalse:
690 import itertools
691 absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
692
693 file_to_iov = {}
694 if not pool:
695 for file_path in absolute_file_paths:
696 B2INFO(f"Finding IoV for {file_path}.")
697 file_to_iov[file_path] = get_iov_from_file(file_path)
698 else:
699 import time
700 results = []
701 for file_path in absolute_file_paths:
702 results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
703
704 while True:
705 if all(map(lambda result: result.ready(), results)):
706 break
707 B2INFO("Still waiting for IoVs to be calculated.")
708 time.sleep(polling_time)
709
710 for result in results:
711 file_iov = result.get()
712 file_to_iov[file_iov[0]] = file_iov[1]
713
714 return file_to_iov
715
716
717def find_absolute_file_paths(file_path_patterns):
718 """
719 Takes a file path list (including wildcards) and performs glob.glob()
720 to extract the absolute file paths to all matching files.
721
722 Also uses set() to prevent multiple instances of the same file path
723 but returns a list of file paths.
724
725 Any non "file" type urls are taken as absolute file paths already and are simply
726 passed through.
727 """
728 existing_file_paths = set()
729 for file_pattern in file_path_patterns:
730 file_pattern_uri = parse_file_uri(file_pattern)
731 if file_pattern_uri.scheme == "file":
732 input_files = glob.glob(file_pattern_uri.path)
733 if not input_files:
734 B2WARNING(f"No files matching {file_pattern} can be found, it will be skipped!")
735 else:
736 for file_path in input_files:
737 file_path = os.path.abspath(file_path)
738 if os.path.isfile(file_path):
739 existing_file_paths.add(file_path)
740 else:
741 B2INFO(f"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
742 existing_file_paths.add(file_pattern)
743
744 abs_file_paths = list(existing_file_paths)
745 return abs_file_paths
746
747
748def parse_raw_data_iov(file_path):
749 """
750 For as long as the Raw data is stored using a predictable directory/filename structure
751 we can take advantage of it to more quickly infer the IoV of the files.
752
753 Parameters:
754 file_path (str): The absolute file path of a Raw data file on KEKCC
755
756 Returns:
757 `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
758 """
759 Path = pathlib.Path
760 file_path = Path(file_path)
761
762 # We'll try and extract the exp and run from both the directory and filename
763 # That will let us check that everything is as we expect
764
765 try:
766 reduced_path = file_path.relative_to("/hsm/belle2/bdata/Data/Raw")
767 # Second try for the calibration data path
768 except ValueError:
769 reduced_path = file_path.relative_to("/group/belle2/dataprod/Data/Raw")
770
771 try:
772 path_exp = int(reduced_path.parts[0][1:])
773 path_run = int(reduced_path.parts[1][1:])
774
775 split_filename = reduced_path.name.split(".")
776 filename_exp = int(split_filename[1])
777 filename_run = int(split_filename[2])
778 except ValueError as e:
779 raise ValueError(f"Wrong file path: {file_path}.") from e
780
781 if path_exp == filename_exp and path_run == filename_run:
782 return IoV(path_exp, path_run, path_exp, path_run)
783 else:
784 raise ValueError(f"Filename and directory gave different IoV after parsing for: {file_path}.")
785
786
787def create_directories(path, overwrite=True):
788 """
789 Creates a new directory path. If it already exists it will either leave it as is (including any contents),
790 or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
791 """
792 # Delete if overwriting and it exists
793 if (path.exists() and overwrite):
794 shutil.rmtree(path)
795 # If it never existed or we just deleted it, make it now
796 if not path.exists():
797 os.makedirs(path)
798
799
800def find_int_dirs(dir_path):
801 """
802 If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
803 from iterations above your current one. This function will find directories that match an integer.
804
805 Parameters:
806 dir_path(`pathlib.Path`): The directory to search inside.
807
808 Returns:
809 list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
810 """
811 paths = []
812 all_dirs = [sub_dir for sub_dir in dir_path.glob("*") if sub_dir.is_dir()]
813 for directory in all_dirs:
814 try:
815 int(directory.name)
816 paths.append(directory)
817 except ValueError:
818 pass
819 return paths
820
821
822def parse_file_uri(file_uri):
823 """
824 A central function for parsing file URI strings. Just so we only have to change it in one place later.
825
826 Parameters:
827 file_uri (str)
828
829 Returns:
830 urllib.parse.ParseResult
831 """
832 return urlparse(file_uri, scheme="file", allow_fragments=False)
833
834
835UNBOUND_EXPRUN = ExpRun(-1, -1)
836
837# @endcond
STL class.