Belle II Software  release-06-02-00
cli_download.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 Script to download the contents of a globaltag of the central database.
13 
14 This allows to use the payloads as a local payload directory or use it as a
15 local database when running basf2.
16 """
17 
18 import sys
19 import os
20 import requests
21 import shutil
22 import fnmatch
23 import re
24 import functools
25 import textwrap
26 from urllib.parse import urljoin
27 from . import ConditionsDB, encode_name, file_checksum
28 from .cli_utils import ItemFilter
29 from .local_metadata import LocalMetadataProvider
30 from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
31 from basf2.utils import get_terminal_width
32 from concurrent.futures import ThreadPoolExecutor
33 
34 
35 def check_payload(destination, payloadinfo):
36  """
37  Download a single payload and return a list of all iovs. If the functions
38  returns None there was an error downloading.
39  """
40  payload = payloadinfo["payloadId"]
41  module = payload["basf2Module"]["name"]
42  revision = int(payload["revision"])
43  checksum = payload["checksum"]
44 
45  url = payload["payloadUrl"]
46  base = payload["baseUrl"]
47  local_file = os.path.join(destination, os.path.basename(url))
48  remote_file = urljoin(base + "/", url)
49 
50  iovlist = []
51  for iov in payloadinfo["payloadIovs"]:
52  iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
53 
54  return (local_file, remote_file, checksum, iovlist)
55 
56 
57 def download_file(db, local_file, remote_file, checksum, iovlist=None):
58  """Actually download the file"""
59  # check if existing
60  if os.path.exists(local_file):
61  if file_checksum(local_file) == checksum:
62  # done, nothing else to do
63  return iovlist
64  else:
65  B2WARNING("Checksum mismatch for %s, downloading again" % local_file)
66 
67  # download the file
68  B2INFO("download %s" % local_file)
69  with open(local_file, "wb") as out:
70  file_req = db._session.get(remote_file, stream=True)
71  if file_req.status_code != requests.codes.ok:
72  B2ERROR(f"Error downloading {file_req.url}: {file_req.status_code}")
73  return None
74  shutil.copyfileobj(file_req.raw, out)
75 
76  # and check it
77  if file_checksum(local_file) != checksum:
78  B2ERROR("Checksum mismatch after download: %s" % local_file)
79  return None
80 
81  return iovlist
82 
83 
84 def download_payload(db, payload, directory):
85  """Download a payload given a PayloadInformation object"""
86  remote = urljoin(payload.base_url, payload.payload_url)
87  local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
88  try:
89  os.makedirs(os.path.dirname(local), exist_ok=True)
90  except OSError as e:
91  B2ERROR(f"Cannot download payload: {e}")
92  return None
93  return download_file(db, local, remote, payload.checksum, iovlist=local)
94 
95 
96 def get_tagnames(db, patterns, use_regex=False):
97  """Return a list of tags matching all patterns"""
98  all_tags = db.get_globalTags()
99  final = set()
100  for tag in patterns:
101  if not use_regex:
102  tagnames = fnmatch.filter(all_tags, tag)
103  else:
104  try:
105  tagname_regex = re.compile(tag, re.IGNORECASE)
106  except Exception as e:
107  B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
108  return False
109  tagnames = (e for e in all_tags if tagname_regex.search(e))
110 
111  final |= set(tagnames)
112  return list(final)
113 
114 
115 def command_legacydownload(args, db=None):
116  """
117  Download a globaltag from the database
118 
119  This command allows to download a globaltag from the central database to be
120  used locally, either als lookup directory for payloads or as a standalone
121  local database if --create-dbfile is specified.
122 
123  The command requires the TAGNAME to download and optionally an output
124  directory which defaults to centraldb in the local working directory. It
125  will check for existing payloads in the output directory and only download
126  payloads which are not present or don't have the excpeted checksum.
127 
128  One can filter the payloads to be downloaded by payload name using the
129  --filter, --exclude and --regex options.
130 
131  .. versionadded:: release-04-00-00
132 
133  This has been renamed from ``download`` and is kept for compatibility
134 
135  .. deprecated:: release-04-00-00
136 
137  Downloading a globaltag should be done in the new format creating sqlite
138  database files
139  """
140 
141  payloadfilter = ItemFilter(args)
142 
143  if db is None:
144  args.add_argument("tag", metavar="TAGNAME", default="production",
145  help="globaltag to download")
146  args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
147  help="directory to put the payloads into (default: %(default)s)")
148  args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
149  help="if given save information about all payloads in DIR/database.txt")
150  payloadfilter.add_arguments("payloads")
151  args.add_argument("-j", type=int, default=1, dest="nprocess",
152  help="Number of concurrent connections to use for file "
153  "download (default: %(default)s)")
154  args.add_argument("--retries", type=int, default=3,
155  help="Number of retries on connection problems (default: "
156  "%(default)s)")
157  group = args.add_mutually_exclusive_group()
158  group.add_argument("--tag-pattern", default=False, action="store_true",
159  help="if given, all globaltags which match the shell-style "
160  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
161  "``?`` stands for a single character. "
162  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
163  group.add_argument("--tag-regex", default=False, action="store_true",
164  help="if given, all globaltags matching the regular "
165  "expression given by TAGNAME will be downloaded (see "
166  "https://docs.python.org/3/library/re.html). "
167  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
168  return
169 
170  try:
171  os.makedirs(args.destination, exist_ok=True)
172  except OSError:
173  B2ERROR("cannot create destination directory", file=sys.stderr)
174  return 1
175 
176  if not payloadfilter.check_arguments():
177  return 1
178 
179  # modify logging to remove the useless module: lines
180  for level in LogLevel.values.values():
181  logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
182 
183  tagnames = [args.tag]
184 
185  if args.tag_pattern or args.tag_regex:
186  tagnames = get_tagnames(db, tagnames, args.tag_regex)
187 
188  failed = 0
189  for tagname in sorted(tagnames):
190  try:
191  req = db.request("GET", "/globalTag/{}/globalTagPayloads".format(encode_name(tagname)),
192  f"Downloading list of payloads for {tagname} tag{payloadfilter}")
193  except ConditionsDB.RequestError as e:
194  B2ERROR(str(e))
195  continue
196 
197  download_list = {}
198  for payload in req.json():
199  name = payload["payloadId"]["basf2Module"]["name"]
200  if payloadfilter.check(name):
201  local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload)
202  if local_file in download_list:
203  download_list[local_file][-1] += iovlist
204  else:
205  download_list[local_file] = [local_file, remote_file, checksum, iovlist]
206 
207  # do the downloading
208  full_iovlist = []
209  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
210  for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
211  if iovlist is None:
212  failed += 1
213  continue
214 
215  full_iovlist += iovlist
216 
217  if args.create_dbfile:
218  dbfile = []
219  for iov in sorted(full_iovlist):
220  dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
221  dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
222  with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
223  txtfile.writelines(dbfile)
224 
225  if failed > 0:
226  B2ERROR("{} out of {} payloads could not be downloaded".format(failed, len(download_list)))
227  return 1
228 
229 
230 def command_download(args, db=None):
231  """
232  Download one or more payloads into a sqlite database for local use
233 
234  This command allows to download the information from one or more globaltags
235  from the central database to be used locally.
236 
237  The command requires at least one tag name to download. It will check for
238  existing payloads in the output directory and only download payloads which
239  are not present or don't have the excpeted checksum.
240 
241  By default this script will create a local directory called ``conditions/``
242  which contains a ``metadata.sqlite`` with all the payload information of all
243  selected globaltags and sub directories containing all the payload files.
244 
245  This can be changed by specifying a different name for the metadata file
246  using the ``-o`` argument but the payloads will always be saved in sub
247  directoies in the same directory as the sqlite file.
248 
249  .. versionchanged:: release-04-00-00
250 
251  Previously this command was primarily intended to download payloads for
252  one globaltag and optionally create a text file with payload information
253  as well as download all necessary file. This has been changed and will
254  now create a sqlite file containing the payload metadata. If you need the
255  old behavior please use the command ``b2conditionsdb legacydownload``
256  """
257 
258  if db is None:
259  args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
260  args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
261  help="Name of the database file to create (default: %(default)s)")
262  args.add_argument("-f", "--force", action="store_true", default=False,
263  help="Don't ask permission if the output database file exists")
264  args.add_argument("--append", action="store_true", default=False,
265  help="Append to the existing database file if possible. "
266  "Otherwise the content in the database file will be overwritten")
267  group = args.add_mutually_exclusive_group()
268  group.add_argument("--no-download", action="store_true", default=False,
269  help="Don't download any payloads, just fetch the metadata information")
270  group.add_argument("--only-download", action="store_true", default=False,
271  help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
272  args.add_argument("--delete-extra-payloads", default=False, action="store_true",
273  help="if given this script will delete all extra files "
274  "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
275  "if they are not referenced in the database file.")
276  args.add_argument("--ignore-missing", action="store_true", default=False,
277  help="Ignore missing globaltags and download all other tags")
278  args.add_argument("-j", type=int, default=1, dest="nprocess",
279  help="Number of concurrent connections to use for file "
280  "download (default: %(default)s)")
281  args.add_argument("--retries", type=int, default=3,
282  help="Number of retries on connection problems (default: "
283  "%(default)s)")
284  group = args.add_mutually_exclusive_group()
285  group.add_argument("--tag-pattern", default=False, action="store_true",
286  help="if given, all globaltags which match the shell-style "
287  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
288  "``?`` stands for a single character. ")
289  group.add_argument("--tag-regex", default=False, action="store_true",
290  help="if given, all globaltags matching the regular "
291  "expression given by TAGNAME will be downloaded (see "
292  "https://docs.python.org/3/library/re.html). ")
293  return
294 
295  # if we only download we need no tags, but otherwise check the tag list
296  if not args.only_download:
297  if args.tag_regex or args.tag_pattern:
298  args.tag = get_tagnames(db, args.tag, args.tag_regex)
299 
300  if not args.tag:
301  B2ERROR("No tags selected, cannot continue")
302  return 1
303 
304  def get_taginfo(tagname):
305  """return the important information about all our globaltags"""
306  tag_info = db.get_globalTagInfo(tagname)
307  if not tag_info:
308  B2ERROR(f"Cannot find globaltag {tagname}")
309  return None
310  return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
311 
312  # so lets get info on all our tags and check if soem are missing ...
313  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
314  tags = list(pool.map(get_taginfo, args.tag))
315 
316  if not args.ignore_missing and None in tags:
317  return 1
318  # ok, remove tags that didn't exist ... and print the final list
319  tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
320  taglist = ["Selected globaltags:"]
321  taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
322  initial_indent=" "*4, subsequent_indent=" "*4)
323  B2INFO('\n'.join(taglist))
324 
325  # ok, we either download something or need to modify the db file, make sure
326  # the output directory exists ...
327  destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
328  try:
329  os.makedirs(destination, exist_ok=True)
330  except OSError as e:
331  B2ERROR(f"cannot create output directory, {e}")
332  return 1
333 
334  if not os.path.exists(args.dbfile):
335  # no file? no append!
336  args.append = False
337  elif not args.force and not args.only_download:
338  # but if it exists ask the user ...
339  query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
340  if query.lower().strip() not in ['y', 'yes']:
341  B2ERROR("Output file exists, cannot continue")
342  return 1
343 
344  try:
345  # if we only download we can open readonly
346  mode = "read" if args.only_download else ("append" if args.append else "overwrite")
347  database = LocalMetadataProvider(args.dbfile, mode)
348  # we we only download we don't need to fix the schema but should make sure there's actually something in it
349  if args.only_download:
350  if database.get_payload_count() == 0:
351  return 0
352 
353  except Exception as e:
354  B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
355  return 1
356 
357  # we know the tags, we have a database file ... lets get the metadata
358  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
359  if not args.only_download:
360  # loop over all tags with their iovs being downloaded in parallel
361  for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
362  B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
363  database.add_globaltag(tag_id, tag_name, tag_state, iovs)
364 
365  # and finally download all necessary payloads for this file
366  if args.no_download:
367  return 0
368 
369  # make sure all the payloads referenced in the file are present
370  downloader = functools.partial(download_payload, db, directory=destination)
371  all_payloads = set(pool.map(downloader, database.get_payloads()))
372 
373  if args.delete_extra_payloads:
374  existing_files = set()
375  for dirname, subdirs, filenames in os.walk(destination):
376  # only look in sub directories matching a hex substring
377  subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
378  # and don't check files in top dir
379  if dirname == destination:
380  continue
381  # and add all others
382  for filename in filenames:
383  if not re.match(r"(.+)_r(\d+).root", filename):
384  continue
385  existing_files.add(os.path.join(dirname, filename))
386 
387  extra_files = existing_files - all_payloads
388  B2INFO(f"Deleting {len(extra_files)} additional payload files")
389  # delete all the files and consume the results to trigger any errors
390  list(pool.map(os.remove, extra_files))
391 
392  return 1 if None in all_payloads else 0