Belle II Software  release-05-01-25
utils.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 This module contains various utility functions for the CAF and Job submission Backends to use.
6 """
7 
8 from basf2 import B2INFO, B2WARNING, B2DEBUG
9 import os
10 import glob
11 from collections import deque
12 from collections import OrderedDict
13 from collections import namedtuple
14 from collections import defaultdict
15 import pathlib
16 import json
17 from functools import singledispatch, update_wrapper
18 import contextlib
19 import enum
20 import shutil
21 import itertools
22 from urllib.parse import urlparse
23 
24 import ROOT
25 from ROOT.Belle2 import CalibrationAlgorithm, IntervalOfValidity
26 
27 
28 b2info_newline = "\n" + (7 * " ")
29 
30 
31 def B2INFO_MULTILINE(lines):
32  """
33  Parameters:
34  lines (list[str]): Lines to be printed in a single call to B2INFO
35 
36  Quick little function that creates a string for B2INFO from a list of strings.
37  But it appends a newline character + the necessary indentation to the follwing line
38  so that the B2INFO output is nicely aligned.
39  Then it calls B2INFO on the output.
40  """
41  log_string = b2info_newline.join(lines)
42  B2INFO(log_string)
43 
44 
45 def grouper(n, iterable):
46  """
47  Parameters:
48  n (int): Maximum size of the list that gets returned.
49  iterable (list): The original list that we want to return groups of size 'n' from.
50 
51  Yields:
52  tuple
53  """
54  it = iter(iterable)
55  while True:
56  chunk = tuple(itertools.islice(it, n))
57  if not chunk:
58  return
59  yield chunk
60 
61 
62 def pairwise(iterable):
63  """
64  Iterate through a sequence by pairing up the current and next entry.
65  Note that when you hit the last one you don't get a (last, null), the
66  final iteration gives you (last-1, last) and then finishes. If you only
67  have one entry in the sequence this may be important as you will not get any
68  looping.
69 
70  Parameters:
71  iterable (list): The iterable object we will loop over
72 
73  Returns:
74  list[tuple]
75  """
76  a, b = itertools.tee(iterable)
77  next(b, None)
78  return zip(a, b)
79 
80 
81 def find_gaps_in_iov_list(iov_list):
82  """
83  Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
84  runs which lie between two IoVs that are separated by an experiment e.g. between
85  IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
86  number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
87  and will not be added to the output list of IoVs
88 
89  Parameters:
90  iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
91  i.e. runs that aren't covered.
92 
93  Returns:
94  list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
95  """
96  gaps = []
97  previous_iov = None
98  for current_iov in iov_list:
99  if previous_iov:
100  previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
101  current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
102  iov_gap = previous_highest.find_gap(current_lowest)
103  if iov_gap:
104  B2DEBUG(29, f"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
105  gaps.append(iov_gap)
106  previous_iov = current_iov
107  return gaps
108 
109 
110 class ExpRun(namedtuple('ExpRun_Factory', ['exp', 'run'])):
111  """
112  Class to define a single (Exp,Run) number i.e. not an IoV.
113  It is derived from a namedtuple created class.
114 
115  We use the name 'ExpRun_Factory' in the factory creation so that
116  the MRO doesn't contain two of the same class names which is probably fine
117  but feels wrong.
118 
119  KeyWord Arguments:
120  exp (int): The experiment number
121  run (int): The run number
122  """
123 
124  def make_iov(self):
125  """
126  Returns:
127  IoV: A simple IoV corresponding to this single ExpRun
128  """
129  return IoV(self.exp, self.run, self.exp, self.run)
130 
131  def find_gap(self, other):
132  """
133  Finds the IoV gap bewteen these two ExpRuns.
134  """
135  lower, upper = sorted((self, other))
136  if lower.exp == upper.exp and lower.run != upper.run:
137  if (upper.run - lower.run) > 1:
138  return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
139  else:
140  return None
141  else:
142  return None
143 
144 
145 class IoV(namedtuple('IoV_Factory', ['exp_low', 'run_low', 'exp_high', 'run_high'])):
146  """
147  Python class to more easily manipulate an IoV and compare against others.
148  Uses the C++ framework IntervalOfValidity internally to do various comparisons.
149  It is derived from a namedtuple created class.
150 
151  We use the name 'IoV_Factory' in the factory creation so that
152  the MRO doesn't contain two of the same class names which is probably fine
153  but feels wrong.
154 
155  Default construction is an 'empty' IoV of -1,-1,-1,-1
156  e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
157 
158  For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
159  """
160 
161  def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
162  """
163  The special method to create the tuple instance. Returning the instance
164  calls the __init__ method.
165  """
166  return super().__new__(cls, exp_low, run_low, exp_high, run_high)
167 
168  def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
169  """
170  Called after __new__.
171  """
172  self._cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
173 
174  def contains(self, iov):
175  """
176  Check if this IoV contains another one that is passed in.
177  """
178  return self._cpp_iov.contains(iov._cpp_iov)
179 
180  def overlaps(self, iov):
181  """
182  Check if this IoV overlaps another one that is passed in.
183  """
184  return self._cpp_iov.overlaps(iov._cpp_iov)
185 
186 
187 @enum.unique
188 class AlgResult(enum.Enum):
189  """
190  Enum of Calibration results. Shouldn't be very necessary to use this
191  over the direct CalibrationAlgorithm members but it's nice to have
192  something pythonic ready to go.
193  """
194 
195  ok = CalibrationAlgorithm.c_OK
196 
197  not_enough_data = CalibrationAlgorithm.c_NotEnoughData
198 
199  iterate = CalibrationAlgorithm.c_Iterate
200 
201  failure = CalibrationAlgorithm.c_Failure
202 
203 
204 IoV_Result = namedtuple('IoV_Result', ['iov', 'result'])
205 
206 
208  """
209  Simple class to hold the information about a basf2 Local database.
210  Does a bit of checking that the file path entered is valid etc.
211 
212  Paramters:
213  filepath (str): The file path of the database.txt file of the localdb
214 
215  Keyword Arguments:
216  payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
217  """
218  db_type = "local"
219 
220  def __init__(self, filepath, payload_dir=''):
221  f = pathlib.Path(filepath)
222  if f.exists():
223  self.filepath = f.resolve()
224  if not payload_dir:
225  self.payload_dir = pathlib.Path(self.filepath.parent)
226  else:
227  p = pathlib.Path(payload_dir)
228  if p.exists():
229  self.payload_dir = p.resolve()
230  else:
231  raise ValueError(f"The LocalDatabase payload_dir: {p} does not exist.")
232  else:
233  raise ValueError(f"The LocalDatabase filepath: {f} does not exist.")
234 
235 
237  """
238  Simple class to hold the information about a bas2 Central database.
239  Does no checking that a global tag exists.
240  This class could be made much simpler, but it's made to be similar to LocalDatabase.
241 
242  Parameters:
243  global_tag (str): The Global Tag of the central database
244  """
245  db_type = "central"
246 
247  def __init__(self, global_tag):
248  self.global_tag = global_tag
249 
250 
251 def split_runs_by_exp(runs):
252  """
253  Parameters:
254  runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
255 
256  Returns:
257  list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
258  """
259  split_by_runs = []
260  current_exp = runs[0].exp
261  exp_list = []
262  for exprun in runs:
263  if exprun.exp != current_exp:
264  split_by_runs.append(exp_list)
265  exp_list = [exprun]
266  else:
267  exp_list.append(exprun)
268  current_exp = exprun.exp
269  else:
270  split_by_runs.append(exp_list)
271  return split_by_runs
272 
273 
274 def runs_overlapping_iov(iov, runs):
275  """
276  Takes an overall IoV() object and a list of ExpRun
277  and returns the set of ExpRun containing only those runs that overlap
278  with the IoV.
279 
280  Parameters:
281  iov (IoV): IoV to compare overlaps with
282  runs (list[ExpRun]): The available runs to check if them overlap with the IoV
283 
284  Return:
285  set
286  """
287  overlapping_runs = set()
288  for run in runs:
289  # Construct an IOV of one run
290  run_iov = run.make_iov()
291  if run_iov.overlaps(iov):
292  overlapping_runs.add(run)
293  return overlapping_runs
294 
295 
296 def iov_from_runs(runs):
297  """
298  Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
299  It returns an IoV() object and assumes that the list was in order to begin with.
300  """
301  if len(runs) > 1:
302  exprun_low, exprun_high = runs[0], runs[-1]
303  else:
304  exprun_low, exprun_high = runs[0], runs[0]
305  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
306 
307 
308 def iov_from_runvector(iov_vector):
309  """
310  Takes a vector of ExpRun from CalibrationAlgorithm and returns
311  the overall IoV from the lowest ExpRun to the highest. It returns
312  an IoV() object. It assumes that the vector was in order to begin with.
313  """
314  import copy
315  exprun_list = [list(ExpRun(iov.first, iov.second)) for iov in iov_vector]
316  if len(exprun_list) > 1:
317  exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
318  else:
319  exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
320  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
321 
322 
323 def vector_from_runs(runs):
324  """
325  Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
326 
327  Parameters:
328  runs (list[ExpRun]): The runs to convert
329 
330  Returns:
331  ROOT.vector(ROOT.pair(int,int))
332  """
333  exprun_type = ROOT.pair(int, int)
334  run_vec = ROOT.vector(exprun_type)()
335  run_vec.reserve(len(runs))
336  for run in runs:
337  run_vec.push_back(exprun_type(run.exp, run.run))
338  return run_vec
339 
340 
341 def runs_from_vector(exprun_vector):
342  """
343  Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
344  a Python list of (exp,run) tuples in the same order.
345 
346  Parameters:
347  exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
348 
349  Return:
350  list[ExpRun]
351  """
352  return [ExpRun(exprun.first, exprun.second) for exprun in exprun_vector]
353 
354 
355 def find_run_lists_from_boundaries(boundaries, runs):
356  """
357  Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
358  the runs that are contained in the IoV of each boundary interval. We assume that this
359  is occuring in only one Experiment! We also assume that after the last boundary start
360  you want to include all runs that are higher than this starting ExpRun.
361  Note that the output ExpRuns in their lists will be sorted. So the ordering may be
362  different than the overall input order.
363 
364  Parameters:
365  boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
366  runs (list[ExpRun]): The available runs to chunk into boundaries
367 
368  Return:
369  dict[IoV,list[ExpRun]]
370  """
371  boundary_iov_to_runs = {}
372  # Find the boundary IoVs
373  for start_current, start_next in pairwise(boundaries):
374  # We can safely assume the run-1 because we aren't doing this across multiple experiment numbers
375  boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
376  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
377  boundary_iov_to_runs[boundary_iov] = boundary_runs
378  # The final boundary start won't get iterated above because there's no 'next' boundary. So we add the remaining runs here
379  boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
380  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
381  boundary_iov_to_runs[boundary_iov] = boundary_runs
382  return boundary_iov_to_runs
383 
384 
385 def find_sources(dependencies):
386  """
387  Returns a deque of node names that have no input dependencies.
388  """
389  # Create an OrderedDict to make sure that our sources are
390  # in the same order that we started with
391  in_degrees = OrderedDict((k, 0) for k in dependencies)
392  for node, adjacency_list in dependencies.items():
393  for future_node in adjacency_list:
394  in_degrees[future_node] += 1
395 
396  # We build a deque of nodes with no dependencies
397  sources = deque([])
398  for name, in_degree in in_degrees.items():
399  if in_degree == 0:
400  sources.appendleft(name)
401 
402  return sources
403 
404 
405 def topological_sort(dependencies):
406  """
407  Does a topological sort of a graph (dictionary) where the keys are the
408  node names, and the values are lists of node names that depend on the
409  key (including zero dependencies). It should return the sorted
410  list of nodes.
411 
412  >>> dependencies = {}
413  >>> dependencies['c'] = ['a','b']
414  >>> dependencies['b'] = ['a']
415  >>> dependencies['a'] = []
416  >>> sorted = topological_sort(dependencies)
417  >>> print(sorted)
418  ['c', 'b', 'a']
419  """
420  # We find the in-degree (number of dependencies) for each node
421  # and store it.
422  in_degrees = {k: 0 for k in dependencies}
423  for node, adjacency_list in dependencies.items():
424  for future_node in adjacency_list:
425  in_degrees[future_node] += 1
426 
427  # We build a deque of nodes with no dependencies
428  sources = deque([])
429  for name, in_degree in in_degrees.items():
430  if in_degree == 0:
431  sources.appendleft(name)
432 
433  order = []
434  while sources: # Keep adding and removing from this until solved
435  source = sources.pop() # Pick a node with no dependencies
436  order.append(source) # Add it to our ordered nodes
437  for node in dependencies[source]: # Remove vertices from adjacent nodes
438  in_degrees[node] -= 1
439  if in_degrees[node] == 0: # If we've created a new source, add it.
440  sources.appendleft(node)
441 
442  if len(order) == len(dependencies): # Check if all nodes were ordered
443  return order # If not, then there was a cyclic dependence
444  else:
445  B2WARNING("Cyclic dependency detected, check CAF.add_dependency() calls.")
446  return []
447 
448 
449 def all_dependencies(dependencies, order=None):
450  """
451  Here we pass in a dictionary of the form that is used in topological sort
452  where the keys are nodes, and the values are a list of the nodes that depend
453  on it.
454 
455  However, the value (list) does not necessarily contain all of the future nodes
456  that depend on each one, only those that are directly adjacent in the graph.
457  So there are implicit dependencies not shown in the list.
458 
459  This function calculates the implicit future nodes and returns an OrderedDict
460  with a full list for each node. This may be expensive in memory for
461  complex graphs so be careful.
462 
463  If you care about the ordering of the final OrderedDict you can pass in a list
464  of the nodes. The final OrderedDict then has the same order as the order parameter.
465  """
466  full_dependencies = OrderedDict()
467 
468  def add_out_nodes(node, node_set):
469  """
470  This is a recursive function that follows the tree of adjacent future nodes
471  and adds all of them to a set (so that we have unique items)
472  """
473  for out_node in dependencies[node]:
474  node_set.add(out_node)
475  add_out_nodes(out_node, node_set)
476 
477  if not order:
478  order = dependencies.keys()
479  # Loop over the nodes in the order and recursively head upwards through explicit
480  # adjacent nodes.
481  for node in order:
482  node_dependencies = set()
483  add_out_nodes(node, node_dependencies)
484  full_dependencies[node] = list(node_dependencies)
485 
486  return full_dependencies
487 
488 
489 def past_from_future_dependencies(future_dependencies):
490  nodes = list(future_dependencies.keys())
491  past_dependencies = defaultdict(list)
492  for node, deps in future_dependencies.items():
493  for dep in deps:
494  past_dependencies[dep].append(node)
495  return past_dependencies
496 
497 
498 def decode_json_string(object_string):
499  """
500  Simple function to call json.loads() on a string to return the
501  Python object constructed (Saves importing json everywhere).
502  """
503  return json.loads(object_string)
504 
505 
506 def method_dispatch(func):
507  """
508  Decorator that behaves exactly like functools.singledispatch
509  but which takes the second argument to be the important one
510  that we want to check the type of and dispatch to the correct function.
511 
512  This is needed when trying to dispatch a method in a class, since the
513  first argument of the method is always 'self'.
514  Just decorate around class methods and their alternate functions:
515 
516  >>> @method_dispatch # Default method
517  >>> def my_method(self, default_type, ...):
518  >>> pass
519 
520  >>> @my_method.register(list) # Registers list method for dispatch
521  >>> def _(self, list_type, ...):
522  >>> pass
523 
524  Doesn't work the same for property decorated class methods, as these
525  return a property builtin not a function and change the method naming.
526  Do this type of decoration to get them to work:
527 
528  >>> @property
529  >>> def my_property(self):
530  >>> return self._my_property
531 
532  >>> @my_property.setter
533  >>> @method_dispatch
534  >>> def my_property(self, input_property):
535  >>> pass
536 
537  >>> @my_property.fset.register(list)
538  >>> def _(self, input_list_properties):
539  >>> pass
540  """
541  dispatcher = singledispatch(func)
542 
543  def wrapper(*args, **kw):
544  return dispatcher.dispatch(args[1].__class__)(*args, **kw)
545  wrapper.register = dispatcher.register
546  update_wrapper(wrapper, func)
547  return wrapper
548 
549 
550 @contextlib.contextmanager
551 def temporary_workdir(path):
552  """Context manager that changes the working directory to the given
553  path and then changes it back to its previous value on exit.
554  """
555  prev_cwd = os.getcwd()
556  os.chdir(path)
557  try:
558  yield
559  finally:
560  os.chdir(prev_cwd)
561 
562 
563 class PathExtras():
564  """
565  Simple wrapper for basf2 paths to allow some extra python functionality directly on
566  them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
567  """
568 
569  def __init__(self, path=None):
570  """
571  Initialising with a path.
572  """
573  if path:
574 
575  self.path = path
576  else:
577  path = []
578 
579  self._module_names = []
580  self._update_names()
581 
582  def _update_names(self):
583  """
584  Takes the self.path attribute and uses the current state to recreate the
585  self.module_names list
586  """
587  for module in self.path.modules():
588  self._module_names.append(module.name())
589 
590  def __contains__(self, module_name):
591  """
592  Special method to allow 'module_name in path' type comparisons. Returns
593  a boolean and compares by module name.
594  """
595  self._update_names()
596  return module_name in self._module_names
597 
598  def index(self, module_name):
599  """
600  Returns the index of the first instance of a module in the contained path.
601  """
602  return self._module_names.index(module_name)
603 
604 
605 def merge_local_databases(list_database_dirs, output_database_dir):
606  """
607  Takes a list of database directories and merges them into one new directory,
608  defined by the output_database_dir.
609  It assumes that each of the database directories is of the standard form:
610 
611  directory_name
612  -> database.txt
613  -> <payload file name>
614  -> <payload file name>
615  -> ...
616  """
617  os.mkdir(output_database_dir)
618  database_file_path = os.path.join(output_database_dir, 'database.txt')
619  with open(database_file_path, 'w') as db_file:
620  for directory in list_database_dirs:
621  if not os.path.exists(directory):
622  B2WARNING(f"Database directory {directory} requested by collector but it doesn't exist!")
623  continue
624  else:
625  # Get only the files, not directories
626  listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
627  file_names = [file_name for file_name in listdir(directory) if isfile(join(directory, file_name))]
628  file_names.remove('database.txt')
629  # Now we need the absolute paths to all of the payload files so we can copy them across
630  file_names = [os.path.join(directory, file_name) for file_name in file_names[:]]
631  for file_name in file_names:
632  shutil.copy(file_name, output_database_dir)
633  # Now grab all the IoV stuff from each database.txt files and merge it.
634  with open(os.path.join(directory, 'database.txt'), 'r') as f:
635  for line in f.readlines():
636  db_file.write(line)
637 
638 
639 def get_iov_from_file(file_path):
640  """
641  Returns an IoV of the exp/run contained within the given file.
642  Uses the b2file-metadata-show basf2 tool.
643  """
644  import subprocess
645  metadata_output = subprocess.check_output(['b2file-metadata-show', '--json', file_path])
646  m = json.loads(metadata_output.decode('utf-8'))
647  return IoV(m['experimentLow'], m['runLow'], m['experimentHigh'], m['runHigh'])
648 
649 
650 def get_file_iov_tuple(file_path):
651  """
652  Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
653  """
654  B2INFO(f"Finding IoV for {file_path}.")
655  return (file_path, get_iov_from_file(file_path))
656 
657 
658 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
659  """
660  Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
661  extract the IoV.
662 
663  Paramters:
664  file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
665 
666  Keyword Arguments:
667  polling_time (int): Time between checking if our results are ready.
668  pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
669  We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
670  filterfalse (function): An optional function object that will be called on each absolute filepath found from your patterns.
671  If True is returned the file will have its metadata returned. If False it will be skipped.
672  The filter function should take the filepath string as its only argument.
673 
674  Returns:
675  dict: Mapping of matching input file paths (Key) to their IoV (Value)
676  """
677  absolute_file_paths = find_absolute_file_paths(file_path_patterns)
678  # Optionally filter out files matching our filter function
679  if filterfalse:
680  import itertools
681  absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
682 
683  file_to_iov = {}
684  if not pool:
685  for file_path in absolute_file_paths:
686  B2INFO(f"Finding IoV for {file_path}.")
687  file_to_iov[file_path] = get_iov_from_file(file_path)
688  else:
689  import time
690  results = []
691  for file_path in absolute_file_paths:
692  results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
693 
694  while True:
695  if all(map(lambda result: result.ready(), results)):
696  break
697  B2INFO("Still waiting for IoVs to be calculated.")
698  time.sleep(polling_time)
699 
700  for result in results:
701  file_iov = result.get()
702  file_to_iov[file_iov[0]] = file_iov[1]
703 
704  return file_to_iov
705 
706 
707 def find_absolute_file_paths(file_path_patterns):
708  """
709  Takes a file path list (including wildcards) and performs glob.glob()
710  to extract the absolute file paths to all matching files.
711 
712  Also uses set() to prevent multiple instances of the same file path
713  but returns a list of file paths.
714 
715  Any non "file" type urls are taken as absolute file paths already and are simply
716  passed through.
717  """
718  existing_file_paths = set()
719  for file_pattern in file_path_patterns:
720  file_pattern_uri = parse_file_uri(file_pattern)
721  if file_pattern_uri.scheme == "file":
722  input_files = glob.glob(file_pattern_uri.path)
723  if not input_files:
724  B2WARNING(f"No files matching {file_pattern} can be found, it will be skipped!")
725  else:
726  for file_path in input_files:
727  file_path = os.path.abspath(file_path)
728  if os.path.isfile(file_path):
729  existing_file_paths.add(file_path)
730  else:
731  B2INFO(f"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
732  existing_file_paths.add(file_pattern)
733 
734  abs_file_paths = list(existing_file_paths)
735  return abs_file_paths
736 
737 
738 def parse_raw_data_iov(file_path):
739  """
740  For as long as the Raw data is stored using a predictable directory/filename structure
741  we can take advantage of it to more quickly infer the IoV of the files.
742 
743  Parameters:
744  file_path (str): The absolute file path of a Raw data file on KEKCC
745 
746  Returns:
747  `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
748  """
749  Path = pathlib.Path
750  file_path = Path(file_path)
751 
752  # We'll try and extract the exp and run from both the directory and filename
753  # That wil let us check that everything is as we expect
754 
755  try:
756  reduced_path = file_path.relative_to("/hsm/belle2/bdata/Data/Raw")
757  # Second try for the calibration data path
758  except ValueError:
759  reduced_path = file_path.relative_to("/group/belle2/dataprod/Data/Raw")
760 
761  try:
762  path_exp = int(reduced_path.parts[0][1:])
763  path_run = int(reduced_path.parts[1][1:])
764 
765  split_filename = reduced_path.name.split(".")
766  filename_exp = int(split_filename[1])
767  filename_run = int(split_filename[2])
768  except ValueError as e:
769  raise ValueError(f"Wrong file path: {file_path}.") from e
770 
771  if path_exp == filename_exp and path_run == filename_run:
772  return IoV(path_exp, path_run, path_exp, path_run)
773  else:
774  raise ValueError(f"Filename and directory gave different IoV after parsing for: {file_path}.")
775 
776 
777 def create_directories(path, overwrite=True):
778  """
779  Creates a new directory path. If it already exists it will either leave it as is (including any contents),
780  or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
781  """
782  # Delete if overwriting and it exists
783  if (path.exists() and overwrite):
784  shutil.rmtree(path)
785  # If it never existed or we just deleted it, make it now
786  if not path.exists():
787  os.makedirs(path)
788 
789 
790 def find_int_dirs(dir_path):
791  """
792  If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
793  from iterations above your current one. This function will find directories that match an integer.
794 
795  Parameters:
796  dir_path(`pathlib.Path`): The dircetory to search inside.
797 
798  Returns:
799  list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
800  """
801  paths = []
802  all_dirs = [sub_dir for sub_dir in dir_path.glob("*") if sub_dir.is_dir()]
803  for directory in all_dirs:
804  try:
805  int(directory.name)
806  paths.append(directory)
807  except ValueError as err:
808  pass
809  return paths
810 
811 
812 def parse_file_uri(file_uri):
813  """
814  A central function for parsing file URI strings. Just so we only have to change it in one place later.
815 
816  Parameters:
817  file_uri (str)
818 
819  Returns:
820  urllib.parse.ParseResult
821  """
822  return urlparse(file_uri, scheme="file", allow_fragments=False)
823 
824 
825 UNBOUND_EXPRUN = ExpRun(-1, -1)
utils.PathExtras._update_names
def _update_names(self)
Definition: utils.py:582
utils.IoV.contains
def contains(self, iov)
Definition: utils.py:174
utils.CentralDatabase
Definition: utils.py:236
utils.PathExtras._module_names
_module_names
Holds a list of module names for the path in self.path.
Definition: utils.py:579
utils.IoV.__init__
def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
Definition: utils.py:168
utils.IoV.overlaps
def overlaps(self, iov)
Definition: utils.py:180
utils.PathExtras.__init__
def __init__(self, path=None)
Definition: utils.py:569
modules
Definition: modules.py:1
utils.CentralDatabase.global_tag
global_tag
Definition: utils.py:248
utils.LocalDatabase.filepath
filepath
Definition: utils.py:223
utils.PathExtras.index
def index(self, module_name)
Definition: utils.py:598
utils.ExpRun.make_iov
def make_iov(self)
Definition: utils.py:124
utils.PathExtras.path
path
Attribute to hold path object that this class wraps.
Definition: utils.py:575
utils.LocalDatabase.payload_dir
payload_dir
Definition: utils.py:225
utils.PathExtras.__contains__
def __contains__(self, module_name)
Definition: utils.py:590
utils.ExpRun.find_gap
def find_gap(self, other)
Definition: utils.py:131
utils.LocalDatabase
Definition: utils.py:207
utils.IoV.__new__
def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
Definition: utils.py:161
utils.PathExtras
Definition: utils.py:563
utils.AlgResult
Definition: utils.py:188
utils.ExpRun
Definition: utils.py:110
utils.IoV
Definition: utils.py:145
wrapper
Definition: wrapper.py:1
utils.IoV._cpp_iov
_cpp_iov
Definition: utils.py:172