Belle II Software  release-08-01-10
cli_download.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 Script to download the contents of a globaltag of the central database.
13 
14 This allows to use the payloads as a local payload directory or use it as a
15 local database when running basf2.
16 """
17 
18 import sys
19 import os
20 import requests
21 import shutil
22 import fnmatch
23 import re
24 import functools
25 import textwrap
26 from urllib.parse import urljoin
27 from . import ConditionsDB, encode_name, file_checksum
28 from .cli_utils import ItemFilter
29 from .iov import IntervalOfValidity
30 from .local_metadata import LocalMetadataProvider
31 from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
32 from basf2.utils import get_terminal_width
33 from concurrent.futures import ThreadPoolExecutor
34 
35 
36 def check_payload(destination, payloadinfo, run_range=None):
37  """Return a list of all iovs for a given payload together with the file checksum and filenames.
38 
39  Args:
40  destination (str): local folder where to download the payload
41  payloadinfo (dict): pyload informations as returned by the REST API
42  run_range (b2conditions_db.iov.IntervalOfValidity, optional): Interval of validity . Defaults to None.
43 
44  Returns:
45  tuple: local file name, remote file name, checksum, list of iovs
46  """
47 
48  payload = payloadinfo["payloadId"]
49  module = payload["basf2Module"]["name"]
50  revision = int(payload["revision"])
51  checksum = payload["checksum"]
52 
53  url = payload["payloadUrl"]
54  base = payload["baseUrl"]
55  local_file = os.path.join(destination, os.path.basename(url))
56  remote_file = urljoin(base + "/", url)
57 
58  iovlist = []
59  for iov in payloadinfo["payloadIovs"]:
60  if run_range is not None:
61  if (
62  IntervalOfValidity(
63  iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]
64  ).intersect(run_range)
65  is None
66  ):
67  continue
68  iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
69 
70  return (local_file, remote_file, checksum, iovlist)
71 
72 
73 def download_file(db, local_file, remote_file, checksum, iovlist=None):
74  """Actually download the file"""
75  # check if existing
76  if os.path.exists(local_file):
77  if file_checksum(local_file) == checksum:
78  # done, nothing else to do
79  return iovlist
80  else:
81  B2WARNING("Checksum mismatch for %s, downloading again" % local_file)
82 
83  # download the file
84  B2INFO("download %s" % local_file)
85  with open(local_file, "wb") as out:
86  file_req = db._session.get(remote_file, stream=True)
87  if file_req.status_code != requests.codes.ok:
88  B2ERROR(f"Error downloading {file_req.url}: {file_req.status_code}")
89  return None
90  shutil.copyfileobj(file_req.raw, out)
91 
92  # and check it
93  if file_checksum(local_file) != checksum:
94  B2ERROR("Checksum mismatch after download: %s" % local_file)
95  return None
96 
97  return iovlist
98 
99 
100 def download_payload(db, payload, directory):
101  """Download a payload given a PayloadInformation object"""
102  remote = urljoin(payload.base_url, payload.payload_url)
103  local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
104  try:
105  os.makedirs(os.path.dirname(local), exist_ok=True)
106  except OSError as e:
107  B2ERROR(f"Cannot download payload: {e}")
108  return None
109  return download_file(db, local, remote, payload.checksum, iovlist=local)
110 
111 
112 def get_tagnames(db, patterns, use_regex=False):
113  """Return a list of tags matching all patterns"""
114  all_tags = db.get_globalTags()
115  final = set()
116  for tag in patterns:
117  if not use_regex:
118  tagnames = fnmatch.filter(all_tags, tag)
119  else:
120  try:
121  tagname_regex = re.compile(tag, re.IGNORECASE)
122  except Exception as e:
123  B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
124  return False
125  tagnames = (e for e in all_tags if tagname_regex.search(e))
126 
127  final |= set(tagnames)
128  return list(final)
129 
130 
131 def command_legacydownload(args, db=None):
132  """
133  Download a globaltag from the database
134 
135  This command allows to download a globaltag from the central database to be
136  used locally, either as lookup directory for payloads or as a standalone
137  local database if --create-dbfile is specified.
138 
139  The command requires the TAGNAME to download and optionally an output
140  directory which defaults to centraldb in the local working directory. It
141  will check for existing payloads in the output directory and only download
142  payloads which are not present or don't have the expected checksum.
143 
144  One can filter the payloads to be downloaded by payload name using the
145  --filter, --exclude and --regex options.
146 
147  .. versionadded:: release-04-00-00
148 
149  This has been renamed from ``download`` and is kept for compatibility
150 
151  .. warning::
152 
153  Downloading a globaltag should be done in the new format creating sqlite
154  database files. Please use this legacy tool only for downloading "small"
155  globaltags or very few payloads.
156  """
157 
158  payloadfilter = ItemFilter(args)
159 
160  if db is None:
161  args.add_argument("tag", metavar="TAGNAME", default="production",
162  help="globaltag to download")
163  args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
164  help="directory to put the payloads into (default: %(default)s)")
165  args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
166  help="if given save information about all payloads in DIR/database.txt")
167  payloadfilter.add_arguments("payloads")
168  args.add_argument("-j", type=int, default=1, dest="nprocess",
169  help="Number of concurrent connections to use for file "
170  "download (default: %(default)s)")
171  args.add_argument("--retries", type=int, default=3,
172  help="Number of retries on connection problems (default: "
173  "%(default)s)")
174  args.add_argument("--run-range", nargs=4, default=None, type=int,
175  metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
176  help="Can be four numbers to limit the run range to be downloaded"
177  "Only iovs overlapping, even partially, with this range will be downloaded.")
178  group = args.add_mutually_exclusive_group()
179  group.add_argument("--tag-pattern", default=False, action="store_true",
180  help="if given, all globaltags which match the shell-style "
181  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
182  "``?`` stands for a single character. "
183  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
184  group.add_argument("--tag-regex", default=False, action="store_true",
185  help="if given, all globaltags matching the regular "
186  "expression given by TAGNAME will be downloaded (see "
187  "https://docs.python.org/3/library/re.html). "
188  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
189  return
190 
191  try:
192  os.makedirs(args.destination, exist_ok=True)
193  except OSError:
194  B2ERROR("cannot create destination directory", file=sys.stderr)
195  return 1
196 
197  if not payloadfilter.check_arguments():
198  return 1
199 
200  run_range_str = f' valid in {tuple(args.run_range)}' if args.run_range else ''
201  args.run_range = IntervalOfValidity(args.run_range) if args.run_range else None
202 
203  # modify logging to remove the useless module: lines
204  for level in LogLevel.values.values():
205  logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
206 
207  tagnames = [args.tag]
208 
209  if args.tag_pattern or args.tag_regex:
210  tagnames = get_tagnames(db, tagnames, args.tag_regex)
211 
212  failed = 0
213  for tagname in sorted(tagnames):
214  try:
215  req = db.request("GET", f"/globalTag/{encode_name(tagname)}/globalTagPayloads",
216  f"Downloading list of payloads for {tagname} tag{payloadfilter}{run_range_str}")
217  except ConditionsDB.RequestError as e:
218  B2ERROR(str(e))
219  continue
220 
221  download_list = {}
222  for payload in req.json():
223  name = payload["payloadId"]["basf2Module"]["name"]
224  if payloadfilter.check(name):
225  local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload, args.run_range)
226  if iovlist:
227  if local_file in download_list:
228  download_list[local_file][-1] += iovlist
229  else:
230  download_list[local_file] = [local_file, remote_file, checksum, iovlist]
231 
232  # do the downloading
233  full_iovlist = []
234  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
235  for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
236  if iovlist is None:
237  failed += 1
238  continue
239 
240  full_iovlist += iovlist
241 
242  if args.create_dbfile:
243  dbfile = []
244  for iov in sorted(full_iovlist):
245  dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
246  dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
247  with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
248  txtfile.writelines(dbfile)
249 
250  if failed > 0:
251  B2ERROR("{} out of {} payloads could not be downloaded".format(failed, len(download_list)))
252  return 1
253 
254 
255 def command_download(args, db=None):
256  """
257  Download one or more payloads into a sqlite database for local use
258 
259  This command allows to download the information from one or more globaltags
260  from the central database to be used locally.
261 
262  The command requires at least one tag name to download. It will check for
263  existing payloads in the output directory and only download payloads which
264  are not present or don't have the expected checksum.
265 
266  By default this script will create a local directory called ``conditions/``
267  which contains a ``metadata.sqlite`` with all the payload information of all
268  selected globaltags and sub directories containing all the payload files.
269 
270  This can be changed by specifying a different name for the metadata file
271  using the ``-o`` argument but the payloads will always be saved in sub
272  directories in the same directory as the sqlite file.
273 
274  .. versionchanged:: release-04-00-00
275 
276  Previously this command was primarily intended to download payloads for
277  one globaltag and optionally create a text file with payload information
278  as well as download all necessary file. This has been changed and will
279  now create a sqlite file containing the payload metadata. If you need the
280  old behavior please use the command ``b2conditionsdb-legacydownload``
281  """
282 
283  if db is None:
284  args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
285  args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
286  help="Name of the database file to create (default: %(default)s)")
287  args.add_argument("-f", "--force", action="store_true", default=False,
288  help="Don't ask permission if the output database file exists")
289  args.add_argument("--append", action="store_true", default=False,
290  help="Append to the existing database file if possible. "
291  "Otherwise the content in the database file will be overwritten")
292  group = args.add_mutually_exclusive_group()
293  group.add_argument("--no-download", action="store_true", default=False,
294  help="Don't download any payloads, just fetch the metadata information")
295  group.add_argument("--only-download", action="store_true", default=False,
296  help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
297  args.add_argument("--delete-extra-payloads", default=False, action="store_true",
298  help="if given this script will delete all extra files "
299  "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
300  "if they are not referenced in the database file.")
301  args.add_argument("--ignore-missing", action="store_true", default=False,
302  help="Ignore missing globaltags and download all other tags")
303  args.add_argument("-j", type=int, default=1, dest="nprocess",
304  help="Number of concurrent connections to use for file "
305  "download (default: %(default)s)")
306  args.add_argument("--retries", type=int, default=3,
307  help="Number of retries on connection problems (default: "
308  "%(default)s)")
309  group = args.add_mutually_exclusive_group()
310  group.add_argument("--tag-pattern", default=False, action="store_true",
311  help="if given, all globaltags which match the shell-style "
312  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
313  "``?`` stands for a single character. ")
314  group.add_argument("--tag-regex", default=False, action="store_true",
315  help="if given, all globaltags matching the regular "
316  "expression given by TAGNAME will be downloaded (see "
317  "https://docs.python.org/3/library/re.html). ")
318  return
319 
320  # if we only download we need no tags, but otherwise check the tag list
321  if not args.only_download:
322  if args.tag_regex or args.tag_pattern:
323  args.tag = get_tagnames(db, args.tag, args.tag_regex)
324 
325  if not args.tag:
326  B2ERROR("No tags selected, cannot continue")
327  return 1
328 
329  def get_taginfo(tagname):
330  """return the important information about all our globaltags"""
331  tag_info = db.get_globalTagInfo(tagname)
332  if not tag_info:
333  B2ERROR(f"Cannot find globaltag {tagname}")
334  return None
335  return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
336 
337  # so lets get info on all our tags and check if soem are missing ...
338  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
339  tags = list(pool.map(get_taginfo, args.tag))
340 
341  if not args.ignore_missing and None in tags:
342  return 1
343  # ok, remove tags that didn't exist ... and print the final list
344  tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
345  taglist = ["Selected globaltags:"]
346  taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
347  initial_indent=" "*4, subsequent_indent=" "*4)
348  B2INFO('\n'.join(taglist))
349 
350  # ok, we either download something or need to modify the db file, make sure
351  # the output directory exists ...
352  destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
353  try:
354  os.makedirs(destination, exist_ok=True)
355  except OSError as e:
356  B2ERROR(f"cannot create output directory, {e}")
357  return 1
358 
359  if not os.path.exists(args.dbfile):
360  # no file? no append!
361  args.append = False
362  elif not args.force and not args.only_download:
363  # but if it exists ask the user ...
364  query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
365  if query.lower().strip() not in ['y', 'yes']:
366  B2ERROR("Output file exists, cannot continue")
367  return 1
368 
369  try:
370  # if we only download we can open readonly
371  mode = "read" if args.only_download else ("append" if args.append else "overwrite")
372  database = LocalMetadataProvider(args.dbfile, mode)
373  # we we only download we don't need to fix the schema but should make sure there's actually something in it
374  if args.only_download:
375  if database.get_payload_count() == 0:
376  return 0
377 
378  except Exception as e:
379  B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
380  return 1
381 
382  # we know the tags, we have a database file ... lets get the metadata
383  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
384  if not args.only_download:
385  # loop over all tags with their iovs being downloaded in parallel
386  for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
387  B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
388  database.add_globaltag(tag_id, tag_name, tag_state, iovs)
389 
390  # and finally download all necessary payloads for this file
391  if args.no_download:
392  return 0
393 
394  # make sure all the payloads referenced in the file are present
395  downloader = functools.partial(download_payload, db, directory=destination)
396  all_payloads = set(pool.map(downloader, database.get_payloads()))
397 
398  if args.delete_extra_payloads:
399  existing_files = set()
400  for dirname, subdirs, filenames in os.walk(destination):
401  # only look in sub directories matching a hex substring
402  subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
403  # and don't check files in top dir
404  if dirname == destination:
405  continue
406  # and add all others
407  for filename in filenames:
408  if not re.match(r"(.+)_r(\d+).root", filename):
409  continue
410  existing_files.add(os.path.join(dirname, filename))
411 
412  extra_files = existing_files - all_payloads
413  B2INFO(f"Deleting {len(extra_files)} additional payload files")
414  # delete all the files and consume the results to trigger any errors
415  list(pool.map(os.remove, extra_files))
416 
417  return 1 if None in all_payloads else 0
int intersect(const TRGCDCLpar &lp1, const TRGCDCLpar &lp2, CLHEP::HepVector &v1, CLHEP::HepVector &v2)
intersection
Definition: Lpar.cc:249