Belle II Software  light-2212-foldex
cli_download.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 Script to download the contents of a globaltag of the central database.
13 
14 This allows to use the payloads as a local payload directory or use it as a
15 local database when running basf2.
16 """
17 
18 import sys
19 import os
20 import requests
21 import shutil
22 import fnmatch
23 import re
24 import functools
25 import textwrap
26 from urllib.parse import urljoin
27 from . import ConditionsDB, encode_name, file_checksum
28 from .cli_utils import ItemFilter
29 from .local_metadata import LocalMetadataProvider
30 from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
31 from basf2.utils import get_terminal_width
32 from concurrent.futures import ThreadPoolExecutor
33 
34 
35 def check_payload(destination, payloadinfo):
36  """
37  Download a single payload and return a list of all iovs. If the functions
38  returns None there was an error downloading.
39  """
40  payload = payloadinfo["payloadId"]
41  module = payload["basf2Module"]["name"]
42  revision = int(payload["revision"])
43  checksum = payload["checksum"]
44 
45  url = payload["payloadUrl"]
46  base = payload["baseUrl"]
47  local_file = os.path.join(destination, os.path.basename(url))
48  remote_file = urljoin(base + "/", url)
49 
50  iovlist = []
51  for iov in payloadinfo["payloadIovs"]:
52  iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
53 
54  return (local_file, remote_file, checksum, iovlist)
55 
56 
57 def download_file(db, local_file, remote_file, checksum, iovlist=None):
58  """Actually download the file"""
59  # check if existing
60  if os.path.exists(local_file):
61  if file_checksum(local_file) == checksum:
62  # done, nothing else to do
63  return iovlist
64  else:
65  B2WARNING("Checksum mismatch for %s, downloading again" % local_file)
66 
67  # download the file
68  B2INFO("download %s" % local_file)
69  with open(local_file, "wb") as out:
70  file_req = db._session.get(remote_file, stream=True)
71  if file_req.status_code != requests.codes.ok:
72  B2ERROR(f"Error downloading {file_req.url}: {file_req.status_code}")
73  return None
74  shutil.copyfileobj(file_req.raw, out)
75 
76  # and check it
77  if file_checksum(local_file) != checksum:
78  B2ERROR("Checksum mismatch after download: %s" % local_file)
79  return None
80 
81  return iovlist
82 
83 
84 def download_payload(db, payload, directory):
85  """Download a payload given a PayloadInformation object"""
86  remote = urljoin(payload.base_url, payload.payload_url)
87  local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
88  try:
89  os.makedirs(os.path.dirname(local), exist_ok=True)
90  except OSError as e:
91  B2ERROR(f"Cannot download payload: {e}")
92  return None
93  return download_file(db, local, remote, payload.checksum, iovlist=local)
94 
95 
96 def get_tagnames(db, patterns, use_regex=False):
97  """Return a list of tags matching all patterns"""
98  all_tags = db.get_globalTags()
99  final = set()
100  for tag in patterns:
101  if not use_regex:
102  tagnames = fnmatch.filter(all_tags, tag)
103  else:
104  try:
105  tagname_regex = re.compile(tag, re.IGNORECASE)
106  except Exception as e:
107  B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
108  return False
109  tagnames = (e for e in all_tags if tagname_regex.search(e))
110 
111  final |= set(tagnames)
112  return list(final)
113 
114 
115 def command_legacydownload(args, db=None):
116  """
117  Download a globaltag from the database
118 
119  This command allows to download a globaltag from the central database to be
120  used locally, either as lookup directory for payloads or as a standalone
121  local database if --create-dbfile is specified.
122 
123  The command requires the TAGNAME to download and optionally an output
124  directory which defaults to centraldb in the local working directory. It
125  will check for existing payloads in the output directory and only download
126  payloads which are not present or don't have the expected checksum.
127 
128  One can filter the payloads to be downloaded by payload name using the
129  --filter, --exclude and --regex options.
130 
131  .. versionadded:: release-04-00-00
132 
133  This has been renamed from ``download`` and is kept for compatibility
134 
135  .. warning::
136 
137  Downloading a globaltag should be done in the new format creating sqlite
138  database files. Please use this legacy tool only for downloading "small"
139  globaltags or very few payloads.
140  """
141 
142  payloadfilter = ItemFilter(args)
143 
144  if db is None:
145  args.add_argument("tag", metavar="TAGNAME", default="production",
146  help="globaltag to download")
147  args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
148  help="directory to put the payloads into (default: %(default)s)")
149  args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
150  help="if given save information about all payloads in DIR/database.txt")
151  payloadfilter.add_arguments("payloads")
152  args.add_argument("-j", type=int, default=1, dest="nprocess",
153  help="Number of concurrent connections to use for file "
154  "download (default: %(default)s)")
155  args.add_argument("--retries", type=int, default=3,
156  help="Number of retries on connection problems (default: "
157  "%(default)s)")
158  group = args.add_mutually_exclusive_group()
159  group.add_argument("--tag-pattern", default=False, action="store_true",
160  help="if given, all globaltags which match the shell-style "
161  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
162  "``?`` stands for a single character. "
163  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
164  group.add_argument("--tag-regex", default=False, action="store_true",
165  help="if given, all globaltags matching the regular "
166  "expression given by TAGNAME will be downloaded (see "
167  "https://docs.python.org/3/library/re.html). "
168  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
169  return
170 
171  try:
172  os.makedirs(args.destination, exist_ok=True)
173  except OSError:
174  B2ERROR("cannot create destination directory", file=sys.stderr)
175  return 1
176 
177  if not payloadfilter.check_arguments():
178  return 1
179 
180  # modify logging to remove the useless module: lines
181  for level in LogLevel.values.values():
182  logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
183 
184  tagnames = [args.tag]
185 
186  if args.tag_pattern or args.tag_regex:
187  tagnames = get_tagnames(db, tagnames, args.tag_regex)
188 
189  failed = 0
190  for tagname in sorted(tagnames):
191  try:
192  req = db.request("GET", "/globalTag/{}/globalTagPayloads".format(encode_name(tagname)),
193  f"Downloading list of payloads for {tagname} tag{payloadfilter}")
194  except ConditionsDB.RequestError as e:
195  B2ERROR(str(e))
196  continue
197 
198  download_list = {}
199  for payload in req.json():
200  name = payload["payloadId"]["basf2Module"]["name"]
201  if payloadfilter.check(name):
202  local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload)
203  if local_file in download_list:
204  download_list[local_file][-1] += iovlist
205  else:
206  download_list[local_file] = [local_file, remote_file, checksum, iovlist]
207 
208  # do the downloading
209  full_iovlist = []
210  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
211  for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
212  if iovlist is None:
213  failed += 1
214  continue
215 
216  full_iovlist += iovlist
217 
218  if args.create_dbfile:
219  dbfile = []
220  for iov in sorted(full_iovlist):
221  dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
222  dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
223  with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
224  txtfile.writelines(dbfile)
225 
226  if failed > 0:
227  B2ERROR("{} out of {} payloads could not be downloaded".format(failed, len(download_list)))
228  return 1
229 
230 
231 def command_download(args, db=None):
232  """
233  Download one or more payloads into a sqlite database for local use
234 
235  This command allows to download the information from one or more globaltags
236  from the central database to be used locally.
237 
238  The command requires at least one tag name to download. It will check for
239  existing payloads in the output directory and only download payloads which
240  are not present or don't have the expected checksum.
241 
242  By default this script will create a local directory called ``conditions/``
243  which contains a ``metadata.sqlite`` with all the payload information of all
244  selected globaltags and sub directories containing all the payload files.
245 
246  This can be changed by specifying a different name for the metadata file
247  using the ``-o`` argument but the payloads will always be saved in sub
248  directories in the same directory as the sqlite file.
249 
250  .. versionchanged:: release-04-00-00
251 
252  Previously this command was primarily intended to download payloads for
253  one globaltag and optionally create a text file with payload information
254  as well as download all necessary file. This has been changed and will
255  now create a sqlite file containing the payload metadata. If you need the
256  old behavior please use the command ``b2conditionsdb-legacydownload``
257  """
258 
259  if db is None:
260  args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
261  args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
262  help="Name of the database file to create (default: %(default)s)")
263  args.add_argument("-f", "--force", action="store_true", default=False,
264  help="Don't ask permission if the output database file exists")
265  args.add_argument("--append", action="store_true", default=False,
266  help="Append to the existing database file if possible. "
267  "Otherwise the content in the database file will be overwritten")
268  group = args.add_mutually_exclusive_group()
269  group.add_argument("--no-download", action="store_true", default=False,
270  help="Don't download any payloads, just fetch the metadata information")
271  group.add_argument("--only-download", action="store_true", default=False,
272  help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
273  args.add_argument("--delete-extra-payloads", default=False, action="store_true",
274  help="if given this script will delete all extra files "
275  "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
276  "if they are not referenced in the database file.")
277  args.add_argument("--ignore-missing", action="store_true", default=False,
278  help="Ignore missing globaltags and download all other tags")
279  args.add_argument("-j", type=int, default=1, dest="nprocess",
280  help="Number of concurrent connections to use for file "
281  "download (default: %(default)s)")
282  args.add_argument("--retries", type=int, default=3,
283  help="Number of retries on connection problems (default: "
284  "%(default)s)")
285  group = args.add_mutually_exclusive_group()
286  group.add_argument("--tag-pattern", default=False, action="store_true",
287  help="if given, all globaltags which match the shell-style "
288  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
289  "``?`` stands for a single character. ")
290  group.add_argument("--tag-regex", default=False, action="store_true",
291  help="if given, all globaltags matching the regular "
292  "expression given by TAGNAME will be downloaded (see "
293  "https://docs.python.org/3/library/re.html). ")
294  return
295 
296  # if we only download we need no tags, but otherwise check the tag list
297  if not args.only_download:
298  if args.tag_regex or args.tag_pattern:
299  args.tag = get_tagnames(db, args.tag, args.tag_regex)
300 
301  if not args.tag:
302  B2ERROR("No tags selected, cannot continue")
303  return 1
304 
305  def get_taginfo(tagname):
306  """return the important information about all our globaltags"""
307  tag_info = db.get_globalTagInfo(tagname)
308  if not tag_info:
309  B2ERROR(f"Cannot find globaltag {tagname}")
310  return None
311  return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
312 
313  # so lets get info on all our tags and check if soem are missing ...
314  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
315  tags = list(pool.map(get_taginfo, args.tag))
316 
317  if not args.ignore_missing and None in tags:
318  return 1
319  # ok, remove tags that didn't exist ... and print the final list
320  tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
321  taglist = ["Selected globaltags:"]
322  taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
323  initial_indent=" "*4, subsequent_indent=" "*4)
324  B2INFO('\n'.join(taglist))
325 
326  # ok, we either download something or need to modify the db file, make sure
327  # the output directory exists ...
328  destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
329  try:
330  os.makedirs(destination, exist_ok=True)
331  except OSError as e:
332  B2ERROR(f"cannot create output directory, {e}")
333  return 1
334 
335  if not os.path.exists(args.dbfile):
336  # no file? no append!
337  args.append = False
338  elif not args.force and not args.only_download:
339  # but if it exists ask the user ...
340  query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
341  if query.lower().strip() not in ['y', 'yes']:
342  B2ERROR("Output file exists, cannot continue")
343  return 1
344 
345  try:
346  # if we only download we can open readonly
347  mode = "read" if args.only_download else ("append" if args.append else "overwrite")
348  database = LocalMetadataProvider(args.dbfile, mode)
349  # we we only download we don't need to fix the schema but should make sure there's actually something in it
350  if args.only_download:
351  if database.get_payload_count() == 0:
352  return 0
353 
354  except Exception as e:
355  B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
356  return 1
357 
358  # we know the tags, we have a database file ... lets get the metadata
359  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
360  if not args.only_download:
361  # loop over all tags with their iovs being downloaded in parallel
362  for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
363  B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
364  database.add_globaltag(tag_id, tag_name, tag_state, iovs)
365 
366  # and finally download all necessary payloads for this file
367  if args.no_download:
368  return 0
369 
370  # make sure all the payloads referenced in the file are present
371  downloader = functools.partial(download_payload, db, directory=destination)
372  all_payloads = set(pool.map(downloader, database.get_payloads()))
373 
374  if args.delete_extra_payloads:
375  existing_files = set()
376  for dirname, subdirs, filenames in os.walk(destination):
377  # only look in sub directories matching a hex substring
378  subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
379  # and don't check files in top dir
380  if dirname == destination:
381  continue
382  # and add all others
383  for filename in filenames:
384  if not re.match(r"(.+)_r(\d+).root", filename):
385  continue
386  existing_files.add(os.path.join(dirname, filename))
387 
388  extra_files = existing_files - all_payloads
389  B2INFO(f"Deleting {len(extra_files)} additional payload files")
390  # delete all the files and consume the results to trigger any errors
391  list(pool.map(os.remove, extra_files))
392 
393  return 1 if None in all_payloads else 0