Belle II Software  release-06-01-15
utils.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 
11 
12 """
13 This module contains various utility functions for the CAF and Job submission Backends to use.
14 """
15 
16 from basf2 import B2INFO, B2WARNING, B2DEBUG
17 import os
18 import glob
19 from collections import deque
20 from collections import OrderedDict
21 from collections import namedtuple
22 from collections import defaultdict
23 import pathlib
24 import json
25 from functools import singledispatch, update_wrapper
26 import contextlib
27 import enum
28 import shutil
29 import itertools
30 from urllib.parse import urlparse
31 
32 import ROOT
33 from ROOT.Belle2 import CalibrationAlgorithm, IntervalOfValidity
34 
35 
36 b2info_newline = "\n" + (7 * " ")
37 
38 
39 def B2INFO_MULTILINE(lines):
40  """
41  Parameters:
42  lines (list[str]): Lines to be printed in a single call to B2INFO
43 
44  Quick little function that creates a string for B2INFO from a list of strings.
45  But it appends a newline character + the necessary indentation to the follwing line
46  so that the B2INFO output is nicely aligned.
47  Then it calls B2INFO on the output.
48  """
49  log_string = b2info_newline.join(lines)
50  B2INFO(log_string)
51 
52 
53 def grouper(n, iterable):
54  """
55  Parameters:
56  n (int): Maximum size of the list that gets returned.
57  iterable (list): The original list that we want to return groups of size 'n' from.
58 
59  Yields:
60  tuple
61  """
62  it = iter(iterable)
63  while True:
64  chunk = tuple(itertools.islice(it, n))
65  if not chunk:
66  return
67  yield chunk
68 
69 
70 def pairwise(iterable):
71  """
72  Iterate through a sequence by pairing up the current and next entry.
73  Note that when you hit the last one you don't get a (last, null), the
74  final iteration gives you (last-1, last) and then finishes. If you only
75  have one entry in the sequence this may be important as you will not get any
76  looping.
77 
78  Parameters:
79  iterable (list): The iterable object we will loop over
80 
81  Returns:
82  list[tuple]
83  """
84  a, b = itertools.tee(iterable)
85  next(b, None)
86  return zip(a, b)
87 
88 
89 def find_gaps_in_iov_list(iov_list):
90  """
91  Finds the runs that aren't covered by the input IoVs in the list. This cannot find missing
92  runs which lie between two IoVs that are separated by an experiment e.g. between
93  IoV(1,1,1,10) => IoV(2,1,2,5) it is unknown if there were supposed to be more runs than run
94  number 10 in experiment 1 before starting experiment 2. Therefore this is not counted as a gap
95  and will not be added to the output list of IoVs
96 
97  Parameters:
98  iov_list (list[IoV]): A SORTED list of Non-overlapping IoVs that you want to check for 'gaps'
99  i.e. runs that aren't covered.
100 
101  Returns:
102  list[IoV]: The IoVs corresponding to gaps in the input list of IoVs
103  """
104  gaps = []
105  previous_iov = None
106  for current_iov in iov_list:
107  if previous_iov:
108  previous_highest = ExpRun(previous_iov.exp_high, previous_iov.run_high)
109  current_lowest = ExpRun(current_iov.exp_low, current_iov.run_low)
110  iov_gap = previous_highest.find_gap(current_lowest)
111  if iov_gap:
112  B2DEBUG(29, f"Gap found between {previous_iov} and {current_iov} = {iov_gap}.")
113  gaps.append(iov_gap)
114  previous_iov = current_iov
115  return gaps
116 
117 
118 class ExpRun(namedtuple('ExpRun_Factory', ['exp', 'run'])):
119  """
120  Class to define a single (Exp,Run) number i.e. not an IoV.
121  It is derived from a namedtuple created class.
122 
123  We use the name 'ExpRun_Factory' in the factory creation so that
124  the MRO doesn't contain two of the same class names which is probably fine
125  but feels wrong.
126 
127  KeyWord Arguments:
128  exp (int): The experiment number
129  run (int): The run number
130  """
131 
132  def make_iov(self):
133  """
134  Returns:
135  IoV: A simple IoV corresponding to this single ExpRun
136  """
137  return IoV(self.exp, self.run, self.exp, self.run)
138 
139  def find_gap(self, other):
140  """
141  Finds the IoV gap bewteen these two ExpRuns.
142  """
143  lower, upper = sorted((self, other))
144  if lower.exp == upper.exp and lower.run != upper.run:
145  if (upper.run - lower.run) > 1:
146  return IoV(lower.exp, lower.run + 1, lower.exp, upper.run - 1)
147  else:
148  return None
149  else:
150  return None
151 
152 
153 class IoV(namedtuple('IoV_Factory', ['exp_low', 'run_low', 'exp_high', 'run_high'])):
154  """
155  Python class to more easily manipulate an IoV and compare against others.
156  Uses the C++ framework IntervalOfValidity internally to do various comparisons.
157  It is derived from a namedtuple created class.
158 
159  We use the name 'IoV_Factory' in the factory creation so that
160  the MRO doesn't contain two of the same class names which is probably fine
161  but feels wrong.
162 
163  Default construction is an 'empty' IoV of -1,-1,-1,-1
164  e.g. i = IoV() => IoV(exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
165 
166  For an IoV that encompasses all experiments and runs use 0,0,-1,-1.
167  """
168 
169  def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
170  """
171  The special method to create the tuple instance. Returning the instance
172  calls the __init__ method.
173  """
174  return super().__new__(cls, exp_low, run_low, exp_high, run_high)
175 
176  def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1):
177  """
178  Called after __new__.
179  """
180  self._cpp_iov_cpp_iov = IntervalOfValidity(self.exp_low, self.run_low, self.exp_high, self.run_high)
181 
182  def contains(self, iov):
183  """
184  Check if this IoV contains another one that is passed in.
185  """
186  return self._cpp_iov_cpp_iov.contains(iov._cpp_iov)
187 
188  def overlaps(self, iov):
189  """
190  Check if this IoV overlaps another one that is passed in.
191  """
192  return self._cpp_iov_cpp_iov.overlaps(iov._cpp_iov)
193 
194 
195 @enum.unique
196 class AlgResult(enum.Enum):
197  """
198  Enum of Calibration results. Shouldn't be very necessary to use this
199  over the direct CalibrationAlgorithm members but it's nice to have
200  something pythonic ready to go.
201  """
202 
203  ok = CalibrationAlgorithm.c_OK
204 
205  not_enough_data = CalibrationAlgorithm.c_NotEnoughData
206 
207  iterate = CalibrationAlgorithm.c_Iterate
208 
209  failure = CalibrationAlgorithm.c_Failure
210 
211 
212 IoV_Result = namedtuple('IoV_Result', ['iov', 'result'])
213 
214 
216  """
217  Simple class to hold the information about a basf2 Local database.
218  Does a bit of checking that the file path entered is valid etc.
219 
220  Paramters:
221  filepath (str): The file path of the database.txt file of the localdb
222 
223  Keyword Arguments:
224  payload_dir (str): If the payload directory is different to the directory containing the filepath, you can set it here.
225  """
226  db_type = "local"
227 
228  def __init__(self, filepath, payload_dir=''):
229  f = pathlib.Path(filepath)
230  if f.exists():
231  self.filepathfilepath = f.resolve()
232  if not payload_dir:
233  self.payload_dirpayload_dir = pathlib.Path(self.filepathfilepath.parent)
234  else:
235  p = pathlib.Path(payload_dir)
236  if p.exists():
237  self.payload_dirpayload_dir = p.resolve()
238  else:
239  raise ValueError(f"The LocalDatabase payload_dir: {p} does not exist.")
240  else:
241  raise ValueError(f"The LocalDatabase filepath: {f} does not exist.")
242 
243 
245  """
246  Simple class to hold the information about a bas2 Central database.
247  Does no checking that a global tag exists.
248  This class could be made much simpler, but it's made to be similar to LocalDatabase.
249 
250  Parameters:
251  global_tag (str): The Global Tag of the central database
252  """
253  db_type = "central"
254 
255  def __init__(self, global_tag):
256  self.global_tagglobal_tag = global_tag
257 
258 
259 def split_runs_by_exp(runs):
260  """
261  Parameters:
262  runs (list[ExpRun]): Ordered list of ExpRuns we want to split by Exp value
263 
264  Returns:
265  list[list[ExpRun]]: Same as original list but sublists are generated for each Exp value
266  """
267  split_by_runs = []
268  current_exp = runs[0].exp
269  exp_list = []
270  for exprun in runs:
271  if exprun.exp != current_exp:
272  split_by_runs.append(exp_list)
273  exp_list = [exprun]
274  else:
275  exp_list.append(exprun)
276  current_exp = exprun.exp
277  else:
278  split_by_runs.append(exp_list)
279  return split_by_runs
280 
281 
282 def runs_overlapping_iov(iov, runs):
283  """
284  Takes an overall IoV() object and a list of ExpRun
285  and returns the set of ExpRun containing only those runs that overlap
286  with the IoV.
287 
288  Parameters:
289  iov (IoV): IoV to compare overlaps with
290  runs (list[ExpRun]): The available runs to check if them overlap with the IoV
291 
292  Return:
293  set
294  """
295  overlapping_runs = set()
296  for run in runs:
297  # Construct an IOV of one run
298  run_iov = run.make_iov()
299  if run_iov.overlaps(iov):
300  overlapping_runs.add(run)
301  return overlapping_runs
302 
303 
304 def iov_from_runs(runs):
305  """
306  Takes a list of (Exp,Run) and returns the overall IoV from the lowest ExpRun to the highest.
307  It returns an IoV() object and assumes that the list was in order to begin with.
308  """
309  if len(runs) > 1:
310  exprun_low, exprun_high = runs[0], runs[-1]
311  else:
312  exprun_low, exprun_high = runs[0], runs[0]
313  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
314 
315 
316 def iov_from_runvector(iov_vector):
317  """
318  Takes a vector of ExpRun from CalibrationAlgorithm and returns
319  the overall IoV from the lowest ExpRun to the highest. It returns
320  an IoV() object. It assumes that the vector was in order to begin with.
321  """
322  import copy
323  exprun_list = [list(ExpRun(iov.first, iov.second)) for iov in iov_vector]
324  if len(exprun_list) > 1:
325  exprun_low, exprun_high = exprun_list[0], exprun_list[-1]
326  else:
327  exprun_low, exprun_high = exprun_list[0], copy.deepcopy(exprun_list[0])
328  return IoV(exprun_low.exp, exprun_low.run, exprun_high.exp, exprun_high.run)
329 
330 
331 def vector_from_runs(runs):
332  """
333  Convert a sequence of `ExpRun` to a std vector<pair<int,int>>
334 
335  Parameters:
336  runs (list[ExpRun]): The runs to convert
337 
338  Returns:
339  ROOT.vector(ROOT.pair(int,int))
340  """
341  exprun_type = ROOT.pair(int, int)
342  run_vec = ROOT.vector(exprun_type)()
343  run_vec.reserve(len(runs))
344  for run in runs:
345  run_vec.push_back(exprun_type(run.exp, run.run))
346  return run_vec
347 
348 
349 def runs_from_vector(exprun_vector):
350  """
351  Takes a vector of `ExpRun` from CalibrationAlgorithm and returns
352  a Python list of (exp,run) tuples in the same order.
353 
354  Parameters:
355  exprun_vector (``ROOT.vector[ROOT.pair(int,int)]``): Vector of expruns for conversion
356 
357  Return:
358  list[ExpRun]
359  """
360  return [ExpRun(exprun.first, exprun.second) for exprun in exprun_vector]
361 
362 
363 def find_run_lists_from_boundaries(boundaries, runs):
364  """
365  Takes a list of starting ExpRun boundaries and a list of available ExpRuns and finds
366  the runs that are contained in the IoV of each boundary interval. We assume that this
367  is occuring in only one Experiment! We also assume that after the last boundary start
368  you want to include all runs that are higher than this starting ExpRun.
369  Note that the output ExpRuns in their lists will be sorted. So the ordering may be
370  different than the overall input order.
371 
372  Parameters:
373  boundaries (list[ExpRun]): Starting boundary ExpRuns to tell us where to start an IoV
374  runs (list[ExpRun]): The available runs to chunk into boundaries
375 
376  Return:
377  dict[IoV,list[ExpRun]]
378  """
379  boundary_iov_to_runs = {}
380  # Find the boundary IoVs
381  for start_current, start_next in pairwise(boundaries):
382  # We can safely assume the run-1 because we aren't doing this across multiple experiment numbers
383  boundary_iov = IoV(*start_current, start_next.exp, start_next.run-1)
384  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
385  boundary_iov_to_runs[boundary_iov] = boundary_runs
386  # The final boundary start won't get iterated above because there's no 'next' boundary. So we add the remaining runs here
387  boundary_iov = IoV(*boundaries[-1], boundaries[-1].exp, -1)
388  boundary_runs = sorted(runs_overlapping_iov(boundary_iov, runs))
389  boundary_iov_to_runs[boundary_iov] = boundary_runs
390  return boundary_iov_to_runs
391 
392 
393 def find_sources(dependencies):
394  """
395  Returns a deque of node names that have no input dependencies.
396  """
397  # Create an OrderedDict to make sure that our sources are
398  # in the same order that we started with
399  in_degrees = OrderedDict((k, 0) for k in dependencies)
400  for node, adjacency_list in dependencies.items():
401  for future_node in adjacency_list:
402  in_degrees[future_node] += 1
403 
404  # We build a deque of nodes with no dependencies
405  sources = deque([])
406  for name, in_degree in in_degrees.items():
407  if in_degree == 0:
408  sources.appendleft(name)
409 
410  return sources
411 
412 
413 def topological_sort(dependencies):
414  """
415  Does a topological sort of a graph (dictionary) where the keys are the
416  node names, and the values are lists of node names that depend on the
417  key (including zero dependencies). It should return the sorted
418  list of nodes.
419 
420  >>> dependencies = {}
421  >>> dependencies['c'] = ['a','b']
422  >>> dependencies['b'] = ['a']
423  >>> dependencies['a'] = []
424  >>> sorted = topological_sort(dependencies)
425  >>> print(sorted)
426  ['c', 'b', 'a']
427  """
428  # We find the in-degree (number of dependencies) for each node
429  # and store it.
430  in_degrees = {k: 0 for k in dependencies}
431  for node, adjacency_list in dependencies.items():
432  for future_node in adjacency_list:
433  in_degrees[future_node] += 1
434 
435  # We build a deque of nodes with no dependencies
436  sources = deque([])
437  for name, in_degree in in_degrees.items():
438  if in_degree == 0:
439  sources.appendleft(name)
440 
441  order = []
442  while sources: # Keep adding and removing from this until solved
443  source = sources.pop() # Pick a node with no dependencies
444  order.append(source) # Add it to our ordered nodes
445  for node in dependencies[source]: # Remove vertices from adjacent nodes
446  in_degrees[node] -= 1
447  if in_degrees[node] == 0: # If we've created a new source, add it.
448  sources.appendleft(node)
449 
450  if len(order) == len(dependencies): # Check if all nodes were ordered
451  return order # If not, then there was a cyclic dependence
452  else:
453  B2WARNING("Cyclic dependency detected, check CAF.add_dependency() calls.")
454  return []
455 
456 
457 def all_dependencies(dependencies, order=None):
458  """
459  Here we pass in a dictionary of the form that is used in topological sort
460  where the keys are nodes, and the values are a list of the nodes that depend
461  on it.
462 
463  However, the value (list) does not necessarily contain all of the future nodes
464  that depend on each one, only those that are directly adjacent in the graph.
465  So there are implicit dependencies not shown in the list.
466 
467  This function calculates the implicit future nodes and returns an OrderedDict
468  with a full list for each node. This may be expensive in memory for
469  complex graphs so be careful.
470 
471  If you care about the ordering of the final OrderedDict you can pass in a list
472  of the nodes. The final OrderedDict then has the same order as the order parameter.
473  """
474  full_dependencies = OrderedDict()
475 
476  def add_out_nodes(node, node_set):
477  """
478  This is a recursive function that follows the tree of adjacent future nodes
479  and adds all of them to a set (so that we have unique items)
480  """
481  for out_node in dependencies[node]:
482  node_set.add(out_node)
483  add_out_nodes(out_node, node_set)
484 
485  if not order:
486  order = dependencies.keys()
487  # Loop over the nodes in the order and recursively head upwards through explicit
488  # adjacent nodes.
489  for node in order:
490  node_dependencies = set()
491  add_out_nodes(node, node_dependencies)
492  full_dependencies[node] = list(node_dependencies)
493 
494  return full_dependencies
495 
496 
497 def past_from_future_dependencies(future_dependencies):
498  past_dependencies = defaultdict(list)
499  for node, deps in future_dependencies.items():
500  for dep in deps:
501  past_dependencies[dep].append(node)
502  return past_dependencies
503 
504 
505 def decode_json_string(object_string):
506  """
507  Simple function to call json.loads() on a string to return the
508  Python object constructed (Saves importing json everywhere).
509  """
510  return json.loads(object_string)
511 
512 
513 def method_dispatch(func):
514  """
515  Decorator that behaves exactly like functools.singledispatch
516  but which takes the second argument to be the important one
517  that we want to check the type of and dispatch to the correct function.
518 
519  This is needed when trying to dispatch a method in a class, since the
520  first argument of the method is always 'self'.
521  Just decorate around class methods and their alternate functions:
522 
523  >>> @method_dispatch # Default method
524  >>> def my_method(self, default_type, ...):
525  >>> pass
526 
527  >>> @my_method.register(list) # Registers list method for dispatch
528  >>> def _(self, list_type, ...):
529  >>> pass
530 
531  Doesn't work the same for property decorated class methods, as these
532  return a property builtin not a function and change the method naming.
533  Do this type of decoration to get them to work:
534 
535  >>> @property
536  >>> def my_property(self):
537  >>> return self._my_property
538 
539  >>> @my_property.setter
540  >>> @method_dispatch
541  >>> def my_property(self, input_property):
542  >>> pass
543 
544  >>> @my_property.fset.register(list)
545  >>> def _(self, input_list_properties):
546  >>> pass
547  """
548  dispatcher = singledispatch(func)
549 
550  def wrapper(*args, **kw):
551  return dispatcher.dispatch(args[1].__class__)(*args, **kw)
552  wrapper.register = dispatcher.register
553  update_wrapper(wrapper, func)
554  return wrapper
555 
556 
557 @contextlib.contextmanager
558 def temporary_workdir(path):
559  """Context manager that changes the working directory to the given
560  path and then changes it back to its previous value on exit.
561  """
562  prev_cwd = os.getcwd()
563  os.chdir(path)
564  try:
565  yield
566  finally:
567  os.chdir(prev_cwd)
568 
569 
570 class PathExtras():
571  """
572  Simple wrapper for basf2 paths to allow some extra python functionality directly on
573  them e.g. comparing whether or not a module is contained within a path with 'in' keyword.
574  """
575 
576  def __init__(self, path=None):
577  """
578  Initialising with a path.
579  """
580  if path:
581 
582  self.pathpath = path
583  else:
584  path = []
585 
586  self._module_names_module_names = []
587  self._update_names_update_names()
588 
589  def _update_names(self):
590  """
591  Takes the self.path attribute and uses the current state to recreate the
592  self.module_names list
593  """
594  for module in self.pathpath.modules():
595  self._module_names_module_names.append(module.name())
596 
597  def __contains__(self, module_name):
598  """
599  Special method to allow 'module_name in path' type comparisons. Returns
600  a boolean and compares by module name.
601  """
602  self._update_names_update_names()
603  return module_name in self._module_names_module_names
604 
605  def index(self, module_name):
606  """
607  Returns the index of the first instance of a module in the contained path.
608  """
609  return self._module_names_module_names.index(module_name)
610 
611 
612 def merge_local_databases(list_database_dirs, output_database_dir):
613  """
614  Takes a list of database directories and merges them into one new directory,
615  defined by the output_database_dir.
616  It assumes that each of the database directories is of the standard form:
617 
618  directory_name
619  -> database.txt
620  -> <payload file name>
621  -> <payload file name>
622  -> ...
623  """
624  os.mkdir(output_database_dir)
625  database_file_path = os.path.join(output_database_dir, 'database.txt')
626  with open(database_file_path, 'w') as db_file:
627  for directory in list_database_dirs:
628  if not os.path.exists(directory):
629  B2WARNING(f"Database directory {directory} requested by collector but it doesn't exist!")
630  continue
631  else:
632  # Get only the files, not directories
633  listdir, isfile, join = os.listdir, os.path.isfile, os.path.join
634  file_names = [file_name for file_name in listdir(directory) if isfile(join(directory, file_name))]
635  file_names.remove('database.txt')
636  # Now we need the absolute paths to all of the payload files so we can copy them across
637  file_names = [os.path.join(directory, file_name) for file_name in file_names[:]]
638  for file_name in file_names:
639  shutil.copy(file_name, output_database_dir)
640  # Now grab all the IoV stuff from each database.txt files and merge it.
641  with open(os.path.join(directory, 'database.txt'), 'r') as f:
642  for line in f.readlines():
643  db_file.write(line)
644 
645 
646 def get_iov_from_file(file_path):
647  """
648  Returns an IoV of the exp/run contained within the given file.
649  Uses the b2file-metadata-show basf2 tool.
650  """
651  import subprocess
652  metadata_output = subprocess.check_output(['b2file-metadata-show', '--json', file_path])
653  m = json.loads(metadata_output.decode('utf-8'))
654  return IoV(m['experimentLow'], m['runLow'], m['experimentHigh'], m['runHigh'])
655 
656 
657 def get_file_iov_tuple(file_path):
658  """
659  Simple little function to return both the input file path and the relevant IoV, instead of just the IoV.
660  """
661  B2INFO(f"Finding IoV for {file_path}.")
662  return (file_path, get_iov_from_file(file_path))
663 
664 
665 def make_file_to_iov_dictionary(file_path_patterns, polling_time=10, pool=None, filterfalse=None):
666  """
667  Takes a list of file path patterns (things that glob would understand) and runs b2file-metadata-show over them to
668  extract the IoV.
669 
670  Paramters:
671  file_path_patterns (list[str]): The list of file path patterns you want to get IoVs for.
672 
673  Keyword Arguments:
674  polling_time (int): Time between checking if our results are ready.
675  pool: Optional Pool object used to multprocess the b2file-metadata-show subprocesses.
676  We don't close or join the Pool as you might want to use it yourself, we just wait until the results are ready.
677 
678  filterfalse (`function`): An optional function object that will be called on each absolute filepath found from your
679  patterns. If True is returned the file will have its metadata returned. If False it will be skipped. The filter function
680  should take the filepath string as its only argument.
681 
682  Returns:
683  dict: Mapping of matching input file paths (Key) to their IoV (Value)
684  """
685  absolute_file_paths = find_absolute_file_paths(file_path_patterns)
686  # Optionally filter out files matching our filter function
687  if filterfalse:
688  import itertools
689  absolute_file_paths = list(itertools.filterfalse(filterfalse, absolute_file_paths))
690 
691  file_to_iov = {}
692  if not pool:
693  for file_path in absolute_file_paths:
694  B2INFO(f"Finding IoV for {file_path}.")
695  file_to_iov[file_path] = get_iov_from_file(file_path)
696  else:
697  import time
698  results = []
699  for file_path in absolute_file_paths:
700  results.append(pool.apply_async(get_file_iov_tuple, (file_path,)))
701 
702  while True:
703  if all(map(lambda result: result.ready(), results)):
704  break
705  B2INFO("Still waiting for IoVs to be calculated.")
706  time.sleep(polling_time)
707 
708  for result in results:
709  file_iov = result.get()
710  file_to_iov[file_iov[0]] = file_iov[1]
711 
712  return file_to_iov
713 
714 
715 def find_absolute_file_paths(file_path_patterns):
716  """
717  Takes a file path list (including wildcards) and performs glob.glob()
718  to extract the absolute file paths to all matching files.
719 
720  Also uses set() to prevent multiple instances of the same file path
721  but returns a list of file paths.
722 
723  Any non "file" type urls are taken as absolute file paths already and are simply
724  passed through.
725  """
726  existing_file_paths = set()
727  for file_pattern in file_path_patterns:
728  file_pattern_uri = parse_file_uri(file_pattern)
729  if file_pattern_uri.scheme == "file":
730  input_files = glob.glob(file_pattern_uri.path)
731  if not input_files:
732  B2WARNING(f"No files matching {file_pattern} can be found, it will be skipped!")
733  else:
734  for file_path in input_files:
735  file_path = os.path.abspath(file_path)
736  if os.path.isfile(file_path):
737  existing_file_paths.add(file_path)
738  else:
739  B2INFO(f"Found a non-local file pattern {file_pattern} it will not be checked for validity.")
740  existing_file_paths.add(file_pattern)
741 
742  abs_file_paths = list(existing_file_paths)
743  return abs_file_paths
744 
745 
746 def parse_raw_data_iov(file_path):
747  """
748  For as long as the Raw data is stored using a predictable directory/filename structure
749  we can take advantage of it to more quickly infer the IoV of the files.
750 
751  Parameters:
752  file_path (str): The absolute file path of a Raw data file on KEKCC
753 
754  Returns:
755  `IoV`: The Single Exp,Run IoV that the Raw data file corresponds to.
756  """
757  Path = pathlib.Path
758  file_path = Path(file_path)
759 
760  # We'll try and extract the exp and run from both the directory and filename
761  # That wil let us check that everything is as we expect
762 
763  try:
764  reduced_path = file_path.relative_to("/hsm/belle2/bdata/Data/Raw")
765  # Second try for the calibration data path
766  except ValueError:
767  reduced_path = file_path.relative_to("/group/belle2/dataprod/Data/Raw")
768 
769  try:
770  path_exp = int(reduced_path.parts[0][1:])
771  path_run = int(reduced_path.parts[1][1:])
772 
773  split_filename = reduced_path.name.split(".")
774  filename_exp = int(split_filename[1])
775  filename_run = int(split_filename[2])
776  except ValueError as e:
777  raise ValueError(f"Wrong file path: {file_path}.") from e
778 
779  if path_exp == filename_exp and path_run == filename_run:
780  return IoV(path_exp, path_run, path_exp, path_run)
781  else:
782  raise ValueError(f"Filename and directory gave different IoV after parsing for: {file_path}.")
783 
784 
785 def create_directories(path, overwrite=True):
786  """
787  Creates a new directory path. If it already exists it will either leave it as is (including any contents),
788  or delete it and re-create it fresh. It will only delete the end point, not any intermediate directories created.
789  """
790  # Delete if overwriting and it exists
791  if (path.exists() and overwrite):
792  shutil.rmtree(path)
793  # If it never existed or we just deleted it, make it now
794  if not path.exists():
795  os.makedirs(path)
796 
797 
798 def find_int_dirs(dir_path):
799  """
800  If you previously ran a Calibration and are now re-running after failure, you may have iteration directories
801  from iterations above your current one. This function will find directories that match an integer.
802 
803  Parameters:
804  dir_path(`pathlib.Path`): The dircetory to search inside.
805 
806  Returns:
807  list[`pathlib.Path`]: The matching Path objects to the directories that are valid ints
808  """
809  paths = []
810  all_dirs = [sub_dir for sub_dir in dir_path.glob("*") if sub_dir.is_dir()]
811  for directory in all_dirs:
812  try:
813  int(directory.name)
814  paths.append(directory)
815  except ValueError:
816  pass
817  return paths
818 
819 
820 def parse_file_uri(file_uri):
821  """
822  A central function for parsing file URI strings. Just so we only have to change it in one place later.
823 
824  Parameters:
825  file_uri (str)
826 
827  Returns:
828  urllib.parse.ParseResult
829  """
830  return urlparse(file_uri, scheme="file", allow_fragments=False)
831 
832 
833 UNBOUND_EXPRUN = ExpRun(-1, -1)
def find_gap(self, other)
Definition: utils.py:139
def make_iov(self)
Definition: utils.py:132
def contains(self, iov)
Definition: utils.py:182
def __new__(cls, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
Definition: utils.py:169
def __init__(self, exp_low=-1, run_low=-1, exp_high=-1, run_high=-1)
Definition: utils.py:176
_cpp_iov
Definition: utils.py:180
def overlaps(self, iov)
Definition: utils.py:188
_module_names
Holds a list of module names for the path in self.path.
Definition: utils.py:586
def __init__(self, path=None)
Definition: utils.py:576
def index(self, module_name)
Definition: utils.py:605
path
Attribute to hold path object that this class wraps.
Definition: utils.py:582
def _update_names(self)
Definition: utils.py:589
def __contains__(self, module_name)
Definition: utils.py:597