Belle II Software  release-08-01-10
utils.py
1 #!/usr/bin/env python3
2 
3 # disable doxygen check for this file
4 # @cond
5 
6 
13 
14 """
15 This module contains various utility functions for the CAF and Job submission Backends to use.
16 """
17 
18 from basf2 import B2INFO, B2WARNING, B2DEBUG
19 import os
20 import glob
21 from collections import deque
22 from collections import OrderedDict
23 from collections import namedtuple
24 from collections import defaultdict
25 import pathlib
26 import json
27 from functools import singledispatch, update_wrapper
28 import contextlib
29 import enum
30 import shutil
31 import itertools
32 from urllib.parse import urlparse
33 
34 import ROOT
35 from ROOT.Belle2 import CalibrationAlgorithm, IntervalOfValidity
36 
37 
38 b2info_newline = "\n" + (7 * " ")
39 
40 
41 def B2INFO_MULTILINE(lines):
42  """
43  Parameters:
44  lines (list[str]): Lines to be printed in a single call to B2INFO
45 
46  Quick little function that creates a string for B2INFO from a list of strings.
47  But it appends a newline character + the necessary indentation to the follwing line
48  so that the B2INFO output is nicely aligned.
49  Then it calls B2INFO on the output.
50  """
51  log_string = b2info_newline.join(lines)
52  B2INFO(log_string)
53 
54 
55 def grouper(n, iterable):
56  """
57  Parameters:
58  n (int): Maximum size of the list that gets returned.
59  iterable (list): The original list that we want to return groups of size 'n' from.
60 
61  Yields:
62  tuple
63  """
64  it = iter(iterable)
65  while True:
66  chunk = tuple(itertools.islice(it, n))
67  if not chunk:
68  return
69  yield chunk
70 
71 
72 def pairwise(iterable):
73  """
74  Iterate through a sequence by pairing up the current and next entry.
75  Note that when you hit the last one you don't get a (last, null), the
76  final iteration gives you (last-1, last) and then finishes. If you only
77  have one entry in the sequence this may be important as you will not get any
78  looping.
79 
80  Parameters:
81  iterable (list): The iterable object we will loop over
82 
83  Returns:
84  list[tuple]
85  """
86  a, b = itertools.tee(iterable)
87  next(b, None)
88  return zip(a, b)
89 
90 
91 def find_gaps_in_iov_list(iov_list):
92  """
93  Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
94  runs which lie between two IoVs that are separated by an experiment e.g. between
95  IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
96  number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
97  and will not be added to the output list of IoVs
98 
99  Parameters:
100  iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
101  i.e. runs that aren't covered.
102 
103  Returns:
104  list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
105  """
106  gaps = []
107  previous_iov = None
108  for current_iov in iov_list:
109  if previous_iov:
110  previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
111  current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
112  iov_gap = previous_highest.find_gap(current_lowest)
113  if iov_gap:
114  B2DEBUG(29, f"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
115  gaps.append(iov_gap)
116  previous_iov = current_iov
117  return gaps
118 
119 
120 class ExpRun(namedtuple('ExpRun_Factory', ['exp', 'run'])):
121  """
122  Class to define a single (Exp,Run) number i.e. not an IoV.
123  It is derived from a namedtuple created class.
124 
125  We use the name 'ExpRun_Factory' in the factory creation so that
126  the MRO doesn't contain two of the same class names which is probably fine
127  but feels wrong.
128 
129  KeyWord Arguments:
130  exp (int): The experiment number
131  run (int): The run number
132  """
133 
134  def make_iov(self):
135  """
136  Returns:
137  IoV: A simple IoV corresponding to this single ExpRun
138  """
139  return IoV(self.exp, self.run, self.exp, self.run)
140 
141  def find_gap(self, other):
142  """
143  Finds the IoV gap bewteen these two ExpRuns.
144  """
145  lower, upper = sorted((self, other))
146  if lower.exp == upper.exp and lower.run != upper.run:
147  if (upper.run - lower.run) > 1:
148  return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
149  else:
150  return None
151  else:
152  return None
153 
154 
155 class IoV(namedtuple('IoV_Factory', ['exp_low', 'run_low', 'exp_high', 'run_high'])):
156  """
157  Python class to more easily manipulate an IoV and compare against others.
158  Uses the C++ framework IntervalOfValidity internally to do various comparisons.
159  It is derived from a namedtuple created class.
160 
161  We use the name 'IoV_Factory' in the factory creation so that
162  the MRO doesn't contain two of the same class names which is probably fine
163  but feels wrong.
164 
165  Default construction is an 'empty' IoV of -1,-1,-1,-1
166  e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
167 
168  For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
169  """
170 
171  def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
172  """
173  The special method to create the tuple instance. Returning the instance
174  calls the __init__ method.
175  """
176  return super().__new__(cls, exp_low, run_low, exp_high, run_high)
177 
178  def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
179  """
180  Called after __new__.
181  """
182  self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
183 
184  def contains(self, iov):
185  """
186  Check if this IoV contains another one that is passed in.
187  """
188  return self._cpp_iov.contains(iov._cpp_iov)
189 
190  def overlaps(self, iov):
191  """
192  Check if this IoV overlaps another one that is passed in.
193  """
194  return self._cpp_iov.overlaps(iov._cpp_iov)
195 
196 
197 @enum.unique
198 class AlgResult(enum.Enum):
199  """
200  Enum of Calibration results. Shouldn't be very necessary to use this
201  over the direct CalibrationAlgorithm members but it's nice to have
202  something pythonic ready to go.
203  """
204 
205  ok = CalibrationAlgorithm.c_OK
206 
207  not_enough_data = CalibrationAlgorithm.c_NotEnoughData
208 
209  iterate = CalibrationAlgorithm.c_Iterate
210 
211  failure = CalibrationAlgorithm.c_Failure
212 
213 
214 IoV_Result = namedtuple('IoV_Result', ['iov', 'result'])
215 
216 
217 class LocalDatabase():
218  """
219  Simple class to hold the information about a basf2 Local database.
220  Does a bit of checking that the file path entered is valid etc.
221 
222  Paramters:
223  filepath (str): The file path of the database.txt file of the localdb
224 
225  Keyword Arguments:
226  payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
227  """
228  db_type = "local"
229 
230  def __init__(self, filepath, payload_dir=''):
231  f = pathlib.Path(filepath)
232  if f.exists():
233  self.filepath = f.resolve()
234  if not payload_dir:
235  self.payload_dir = pathlib.Path(self.filepath.parent)
236  else:
237  p = pathlib.Path(payload_dir)
238  if p.exists():
239  self.payload_dir = p.resolve()
240  else:
241  raise ValueError(f"The LocalDatabase payload_dir: {p} does not exist.")
242  else:
243  raise ValueError(f"The LocalDatabase filepath: {f} does not exist.")
244 
245 
246 class CentralDatabase():
247  """
248  Simple class to hold the information about a bas2 Central database.
249  Does no checking that a global tag exists.
250  This class could be made much simpler, but it's made to be similar to LocalDatabase.
251 
252  Parameters:
253  global_tag (str): The Global Tag of the central database
254  """
255  db_type = "central"
256 
257  def __init__(self, global_tag):
258  self.global_tag = global_tag
259 
260 
261 def split_runs_by_exp(runs):
262  """
263  Parameters:
264  runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
265 
266  Returns:
267  list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
268  """
269  split_by_runs = []
270  current_exp = runs[0].exp
271  exp_list = []
272  for exprun in runs:
273  if exprun.exp != current_exp:
274  split_by_runs.append(exp_list)
275  exp_list = [exprun]
276  else:
277  exp_list.append(exprun)
278  current_exp = exprun.exp
279  else:
280  split_by_runs.append(exp_list)
281  return split_by_runs
282 
283 
284 def runs_overlapping_iov(iov, runs):
285  """
286  Takes an overall IoV() object and a list of ExpRun
287  and returns the set of ExpRun containing only those runs that overlap
288  with the IoV.
289 
290  Parameters:
291  iov (IoV): IoV to compare overlaps with
292  runs (list[ExpRun]): The available runs to check if them overlap with the IoV
293 
294  Return:
295  set
296  """
297  overlapping_runs = set()
298  for run in runs:
299  # Construct an IOV of one run
300  run_iov = run.make_iov()
301  if run_iov.overlaps(iov):
302  overlapping_runs.add(run)
303  return overlapping_runs
304 
305 
306 def iov_from_runs(runs):
307  """
308  Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
309  It returns an IoV() object and assumes that the list was in order to begin with.
310  """
311  if len(runs) > 1:
312  exprun_low, exprun_high = runs[0], runs[-1]
313  else:
314  exprun_low, exprun_high = runs[0], runs[0]
315  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
316 
317 
318 def iov_from_runvector(iov_vector):
319  """
320  Takes a vector of ExpRun from CalibrationAlgorithm and returns
321  the overall IoV from the lowest ExpRun to the highest. It returns
322  an IoV() object. It assumes that the vector was in order to begin with.
323  """
324  import copy
325  exprun_list = [list(ExpRun(iov.first, iov.second)) for iov in iov_vector]
326  if len(exprun_list) > 1:
327  exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
328  else:
329  exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
330  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
331 
332 
333 def vector_from_runs(runs):
334  """
335  Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
336 
337  Parameters:
338  runs (list[ExpRun]): The runs to convert
339 
340  Returns:
341  ROOT.vector(ROOT.pair(int,int))
342  """
343  exprun_type = ROOT.pair(int, int)
344  run_vec = ROOT.vector(exprun_type)()
345  run_vec.reserve(len(runs))
346  for run in runs:
347  run_vec.push_back(exprun_type(run.exp, run.run))
348  return run_vec
349 
350 
351 def runs_from_vector(exprun_vector):
352  """
353  Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
354  a Python list of (exp,run) tuples in the same order.
355 
356  Parameters:
357  exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
358 
359  Return:
360  list[ExpRun]
361  """
362  return [ExpRun(exprun.first, exprun.second) for exprun in exprun_vector]
363 
364 
365 def find_run_lists_from_boundaries(boundaries, runs):
366  """
367  Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
368  the runs that are contained in the IoV of each boundary interval. We assume that this
369  is occuring in only one Experiment! We also assume that after the last boundary start
370  you want to include all runs that are higher than this starting ExpRun.
371  Note that the output ExpRuns in their lists will be sorted. So the ordering may be
372  different than the overall input order.
373 
374  Parameters:
375  boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
376  runs (list[ExpRun]): The available runs to chunk into boundaries
377 
378  Return:
379  dict[IoV,list[ExpRun]]
380  """
381  boundary_iov_to_runs = {}
382  # Find the boundary IoVs
383  for start_current, start_next in pairwise(boundaries):
384  # We can safely assume the run-1 because we aren't doing this across multiple experiment numbers
385  boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
386  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
387  boundary_iov_to_runs[boundary_iov] = boundary_runs
388  # The final boundary start won't get iterated above because there's no 'next' boundary. So we add the remaining runs here
389  boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
390  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
391  boundary_iov_to_runs[boundary_iov] = boundary_runs
392  return boundary_iov_to_runs
393 
394 
395 def find_sources(dependencies):
396  """
397  Returns a deque of node names that have no input dependencies.
398  """
399  # Create an OrderedDict to make sure that our sources are
400  # in the same order that we started with
401  in_degrees = OrderedDict((k, 0) for k in dependencies)
402  for node, adjacency_list in dependencies.items():
403  for future_node in adjacency_list:
404  in_degrees[future_node] += 1
405 
406  # We build a deque of nodes with no dependencies
407  sources = deque([])
408  for name, in_degree in in_degrees.items():
409  if in_degree == 0:
410  sources.appendleft(name)
411 
412  return sources
413 
414 
415 def topological_sort(dependencies):
416  """
417  Does a topological sort of a graph (dictionary) where the keys are the
418  node names, and the values are lists of node names that depend on the
419  key (including zero dependencies). It should return the sorted
420  list of nodes.
421 
422  >>> dependencies = {}
423  >>> dependencies['c'] = ['a','b']
424  >>> dependencies['b'] = ['a']
425  >>> dependencies['a'] = []
426  >>> sorted = topological_sort(dependencies)
427  >>> print(sorted)
428  ['c', 'b', 'a']
429  """
430  # We find the in-degree (number of dependencies) for each node
431  # and store it.
432  in_degrees = {k: 0 for k in dependencies}
433  for node, adjacency_list in dependencies.items():
434  for future_node in adjacency_list:
435  in_degrees[future_node] += 1
436 
437  # We build a deque of nodes with no dependencies
438  sources = deque([])
439  for name, in_degree in in_degrees.items():
440  if in_degree == 0:
441  sources.appendleft(name)
442 
443  order = []
444  while sources: # Keep adding and removing from this until solved
445  source = sources.pop() # Pick a node with no dependencies
446  order.append(source) # Add it to our ordered nodes
447  for node in dependencies[source]: # Remove vertices from adjacent nodes
448  in_degrees[node] -= 1
449  if in_degrees[node] == 0: # If we've created a new source, add it.
450  sources.appendleft(node)
451 
452  if len(order) == len(dependencies): # Check if all nodes were ordered
453  return order # If not, then there was a cyclic dependence
454  else:
455  B2WARNING("Cyclic dependency detected, check CAF.add_dependency() calls.")
456  return []
457 
458 
459 def all_dependencies(dependencies, order=None):
460  """
461  Here we pass in a dictionary of the form that is used in topological sort
462  where the keys are nodes, and the values are a list of the nodes that depend
463  on it.
464 
465  However, the value (list) does not necessarily contain all of the future nodes
466  that depend on each one, only those that are directly adjacent in the graph.
467  So there are implicit dependencies not shown in the list.
468 
469  This function calculates the implicit future nodes and returns an OrderedDict
470  with a full list for each node. This may be expensive in memory for
471  complex graphs so be careful.
472 
473  If you care about the ordering of the final OrderedDict you can pass in a list
474  of the nodes. The final OrderedDict then has the same order as the order parameter.
475  """
476  full_dependencies = OrderedDict()
477 
478  def add_out_nodes(node, node_set):
479  """
480  This is a recursive function that follows the tree of adjacent future nodes
481  and adds all of them to a set (so that we have unique items)
482  """
483  for out_node in dependencies[node]:
484  node_set.add(out_node)
485  add_out_nodes(out_node, node_set)
486 
487  if not order:
488  order = dependencies.keys()
489  # Loop over the nodes in the order and recursively head upwards through explicit
490  # adjacent nodes.
491  for node in order:
492  node_dependencies = set()
493  add_out_nodes(node, node_dependencies)
494  full_dependencies[node] = list(node_dependencies)
495 
496  return full_dependencies
497 
498 
499 def past_from_future_dependencies(future_dependencies):
500  past_dependencies = defaultdict(list)
501  for node, deps in future_dependencies.items():
502  for dep in deps:
503  past_dependencies[dep].append(node)
504  return past_dependencies
505 
506 
507 def decode_json_string(object_string):
508  """
509  Simple function to call json.loads() on a string to return the
510  Python object constructed (Saves importing json everywhere).
511  """
512  return json.loads(object_string)
513 
514 
515 def method_dispatch(func):
516  """
517  Decorator that behaves exactly like functools.singledispatch
518  but which takes the second argument to be the important one
519  that we want to check the type of and dispatch to the correct function.
520 
521  This is needed when trying to dispatch a method in a class, since the
522  first argument of the method is always 'self'.
523  Just decorate around class methods and their alternate functions:
524 
525  >>> @method_dispatch # Default method
526  >>> def my_method(self, default_type, ...):
527  >>> pass
528 
529  >>> @my_method.register(list) # Registers list method for dispatch
530  >>> def _(self, list_type, ...):
531  >>> pass
532 
533  Doesn't work the same for property decorated class methods, as these
534  return a property builtin not a function and change the method naming.
535  Do this type of decoration to get them to work:
536 
537  >>> @property
538  >>> def my_property(self):
539  >>> return self._my_property
540 
541  >>> @my_property.setter
542  >>> @method_dispatch
543  >>> def my_property(self, input_property):
544  >>> pass
545 
546  >>> @my_property.fset.register(list)
547  >>> def _(self, input_list_properties):
548  >>> pass
549  """
550  dispatcher = singledispatch(func)
551 
552  def wrapper(*args, **kw):
553  return dispatcher.dispatch(args[1].__class__)(*args, **kw)
554  wrapper.register = dispatcher.register
555  update_wrapper(wrapper, func)
556  return wrapper
557 
558 
559 @contextlib.contextmanager
560 def temporary_workdir(path):
561  """Context manager that changes the working directory to the given
562  path and then changes it back to its previous value on exit.
563  """
564  prev_cwd = os.getcwd()
565  os.chdir(path)
566  try:
567  yield
568  finally:
569  os.chdir(prev_cwd)
570 
571 
572 class PathExtras():
573  """
574  Simple wrapper for basf2 paths to allow some extra python functionality directly on
575  them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
576  """
577 
578  def __init__(self, path=None):
579  """
580  Initialising with a path.
581  """
582  if path:
583 
584  self.path = path
585  else:
586  path = []
587 
588  self._module_names = []
589  self._update_names()
590 
591  def _update_names(self):
592  """
593  Takes the self.path attribute and uses the current state to recreate the
594  self.module_names list
595  """
596  for module in self.path.modules():
597  self._module_names.append(module.name())
598 
599  def __contains__(self, module_name):
600  """
601  Special method to allow 'module_name in path' type comparisons. Returns
602  a boolean and compares by module name.
603  """
604  self._update_names()
605  return module_name in self._module_names
606 
607  def index(self, module_name):
608  """
609  Returns the index of the first instance of a module in the contained path.
610  """
611  return self._module_names.index(module_name)
612 
613 
614 def merge_local_databases(list_database_dirs, output_database_dir):
615  """
616  Takes a list of database directories and merges them into one new directory,
617  defined by the output_database_dir.
618  It assumes that each of the database directories is of the standard form:
619 
620  directory_name
621  -> database.txt
622  -> <payload file name>
623  -> <payload file name>
624  -> ...
625  """
626  os.mkdir(output_database_dir)
627  database_file_path = os.path.join(output_database_dir, 'database.txt')
628  with open(database_file_path, 'w') as db_file:
629  for directory in list_database_dirs:
630  if not os.path.exists(directory):
631  B2WARNING(f"Database directory {directory} requested by collector but it doesn't exist!")
632  continue
633  else:
634  # Get only the files, not directories
635  listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
636  file_names = [file_name for file_name in listdir(directory) if isfile(join(directory, file_name))]
637  file_names.remove('database.txt')
638  # Now we need the absolute paths to all of the payload files so we can copy them across
639  file_names = [os.path.join(directory, file_name) for file_name in file_names[:]]
640  for file_name in file_names:
641  shutil.copy(file_name, output_database_dir)
642  # Now grab all the IoV stuff from each database.txt files and merge it.
643  with open(os.path.join(directory, 'database.txt')) as f:
644  for line in f.readlines():
645  db_file.write(line)
646 
647 
648 def get_iov_from_file(file_path):
649  """
650  Returns an IoV of the exp/run contained within the given file.
651  Uses the b2file-metadata-show basf2 tool.
652  """
653  import subprocess
654  metadata_output = subprocess.check_output(['b2file-metadata-show', '--json', file_path])
655  m = json.loads(metadata_output.decode('utf-8'))
656  return IoV(m['experimentLow'], m['runLow'], m['experimentHigh'], m['runHigh'])
657 
658 
659 def get_file_iov_tuple(file_path):
660  """
661  Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
662  """
663  B2INFO(f"Finding IoV for {file_path}.")
664  return (file_path, get_iov_from_file(file_path))
665 
666 
667 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
668  """
669  Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
670  extract the IoV.
671 
672  Paramters:
673  file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
674 
675  Keyword Arguments:
676  polling_time (int): Time between checking if our results are ready.
677  pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
678  We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
679 
680  filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
681  patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
682  should take the filepath string as its only argument.
683 
684  Returns:
685  dict: Mapping of matching input file paths (Key) to their IoV (Value)
686  """
687  absolute_file_paths = find_absolute_file_paths(file_path_patterns)
688  # Optionally filter out files matching our filter function
689  if filterfalse:
690  import itertools
691  absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
692 
693  file_to_iov = {}
694  if not pool:
695  for file_path in absolute_file_paths:
696  B2INFO(f"Finding IoV for {file_path}.")
697  file_to_iov[file_path] = get_iov_from_file(file_path)
698  else:
699  import time
700  results = []
701  for file_path in absolute_file_paths:
702  results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
703 
704  while True:
705  if all(map(lambda result: result.ready(), results)):
706  break
707  B2INFO("Still waiting for IoVs to be calculated.")
708  time.sleep(polling_time)
709 
710  for result in results:
711  file_iov = result.get()
712  file_to_iov[file_iov[0]] = file_iov[1]
713 
714  return file_to_iov
715 
716 
717 def find_absolute_file_paths(file_path_patterns):
718  """
719  Takes a file path list (including wildcards) and performs glob.glob()
720  to extract the absolute file paths to all matching files.
721 
722  Also uses set() to prevent multiple instances of the same file path
723  but returns a list of file paths.
724 
725  Any non "file" type urls are taken as absolute file paths already and are simply
726  passed through.
727  """
728  existing_file_paths = set()
729  for file_pattern in file_path_patterns:
730  file_pattern_uri = parse_file_uri(file_pattern)
731  if file_pattern_uri.scheme == "file":
732  input_files = glob.glob(file_pattern_uri.path)
733  if not input_files:
734  B2WARNING(f"No files matching {file_pattern} can be found, it will be skipped!")
735  else:
736  for file_path in input_files:
737  file_path = os.path.abspath(file_path)
738  if os.path.isfile(file_path):
739  existing_file_paths.add(file_path)
740  else:
741  B2INFO(f"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
742  existing_file_paths.add(file_pattern)
743 
744  abs_file_paths = list(existing_file_paths)
745  return abs_file_paths
746 
747 
748 def parse_raw_data_iov(file_path):
749  """
750  For as long as the Raw data is stored using a predictable directory/filename structure
751  we can take advantage of it to more quickly infer the IoV of the files.
752 
753  Parameters:
754  file_path (str): The absolute file path of a Raw data file on KEKCC
755 
756  Returns:
757  `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
758  """
759  Path = pathlib.Path
760  file_path = Path(file_path)
761 
762  # We'll try and extract the exp and run from both the directory and filename
763  # That wil let us check that everything is as we expect
764 
765  try:
766  reduced_path = file_path.relative_to("/hsm/belle2/bdata/Data/Raw")
767  # Second try for the calibration data path
768  except ValueError:
769  reduced_path = file_path.relative_to("/group/belle2/dataprod/Data/Raw")
770 
771  try:
772  path_exp = int(reduced_path.parts[0][1:])
773  path_run = int(reduced_path.parts[1][1:])
774 
775  split_filename = reduced_path.name.split(".")
776  filename_exp = int(split_filename[1])
777  filename_run = int(split_filename[2])
778  except ValueError as e:
779  raise ValueError(f"Wrong file path: {file_path}.") from e
780 
781  if path_exp == filename_exp and path_run == filename_run:
782  return IoV(path_exp, path_run, path_exp, path_run)
783  else:
784  raise ValueError(f"Filename and directory gave different IoV after parsing for: {file_path}.")
785 
786 
787 def create_directories(path, overwrite=True):
788  """
789  Creates a new directory path. If it already exists it will either leave it as is (including any contents),
790  or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
791  """
792  # Delete if overwriting and it exists
793  if (path.exists() and overwrite):
794  shutil.rmtree(path)
795  # If it never existed or we just deleted it, make it now
796  if not path.exists():
797  os.makedirs(path)
798 
799 
800 def find_int_dirs(dir_path):
801  """
802  If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
803  from iterations above your current one. This function will find directories that match an integer.
804 
805  Parameters:
806  dir_path(`pathlib.Path`): The dircetory to search inside.
807 
808  Returns:
809  list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
810  """
811  paths = []
812  all_dirs = [sub_dir for sub_dir in dir_path.glob("*") if sub_dir.is_dir()]
813  for directory in all_dirs:
814  try:
815  int(directory.name)
816  paths.append(directory)
817  except ValueError:
818  pass
819  return paths
820 
821 
822 def parse_file_uri(file_uri):
823  """
824  A central function for parsing file URI strings. Just so we only have to change it in one place later.
825 
826  Parameters:
827  file_uri (str)
828 
829  Returns:
830  urllib.parse.ParseResult
831  """
832  return urlparse(file_uri, scheme="file", allow_fragments=False)
833 
834 
835 UNBOUND_EXPRUN = ExpRun(-1, -1)
836 
837 # @endcond