Belle II Software  release-05-01-25
cli_download.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 
4 """
5 Script to download the contents of a globaltag of the central database.
6 
7 This allows to use the payloads as a local payload directory or use it as a
8 local database when running basf2.
9 """
10 
11 import sys
12 import os
13 import requests
14 import shutil
15 import fnmatch
16 import re
17 import sqlite3
18 import functools
19 import textwrap
20 from urllib.parse import urljoin
21 from . import ConditionsDB, encode_name, file_checksum
22 from .cli_utils import ItemFilter
23 from .local_metadata import LocalMetadataProvider
24 from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
25 from basf2.utils import get_terminal_width
26 from concurrent.futures import ThreadPoolExecutor
27 
28 
29 def check_payload(destination, payloadinfo):
30  """
31  Download a single payload and return a list of all iovs. If the functions
32  returns None there was an error downloading.
33  """
34  payload = payloadinfo["payloadId"]
35  module = payload["basf2Module"]["name"]
36  revision = int(payload["revision"])
37  checksum = payload["checksum"]
38 
39  url = payload["payloadUrl"]
40  base = payload["baseUrl"]
41  local_file = os.path.join(destination, os.path.basename(url))
42  remote_file = urljoin(base + "/", url)
43 
44  iovlist = []
45  for iov in payloadinfo["payloadIovs"]:
46  iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
47 
48  return (local_file, remote_file, checksum, iovlist)
49 
50 
51 def download_file(db, local_file, remote_file, checksum, iovlist=None):
52  """Actually download the file"""
53  # check if existing
54  if os.path.exists(local_file):
55  if file_checksum(local_file) == checksum:
56  # done, nothing else to do
57  return iovlist
58  else:
59  B2WARNING("Checksum mismatch for %s, downloading again" % local_file)
60 
61  # download the file
62  B2INFO("download %s" % local_file)
63  with open(local_file, "wb") as out:
64  file_req = db._session.get(remote_file, stream=True)
65  if file_req.status_code != requests.codes.ok:
66  B2ERROR("Error downloading {0}: {1}".format(file_req.url, file_req.status_code))
67  return None
68  shutil.copyfileobj(file_req.raw, out)
69 
70  # and check it
71  if file_checksum(local_file) != checksum:
72  B2ERROR("Checksum mismatch after download: %s" % local_file)
73  return None
74 
75  return iovlist
76 
77 
78 def download_payload(db, payload, directory):
79  """Download a payload given a PayloadInformation object"""
80  remote = urljoin(payload.base_url, payload.payload_url)
81  local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
82  try:
83  os.makedirs(os.path.dirname(local), exist_ok=True)
84  except OSError as e:
85  B2ERROR(f"Cannot download payload: {e}")
86  return None
87  return download_file(db, local, remote, payload.checksum, iovlist=local)
88 
89 
90 def get_tagnames(db, patterns, use_regex=False):
91  """Return a list of tags matching all patterns"""
92  all_tags = db.get_globalTags()
93  final = set()
94  for tag in patterns:
95  if not use_regex:
96  tagnames = fnmatch.filter(all_tags, tag)
97  else:
98  try:
99  tagname_regex = re.compile(tag, re.IGNORECASE)
100  except Exception as e:
101  B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
102  return False
103  tagnames = (e for e in all_tags if tagname_regex.search(e))
104 
105  final |= set(tagnames)
106  return list(final)
107 
108 
109 def command_legacydownload(args, db=None):
110  """
111  Download a globaltag from the database
112 
113  This command allows to download a globaltag from the central database to be
114  used locally, either als lookup directory for payloads or as a standalone
115  local database if --create-dbfile is specified.
116 
117  The command requires the TAGNAME to download and optionally an output
118  directory which defaults to centraldb in the local working directory. It
119  will check for existing payloads in the output directory and only download
120  payloads which are not present or don't have the excpeted checksum.
121 
122  One can filter the payloads to be downloaded by payload name using the
123  --filter, --exclude and --regex options.
124 
125  .. versionadded:: release-04-00-00
126 
127  This has been renamed from ``download`` and is kept for compatibility
128 
129  .. deprecated:: release-04-00-00
130 
131  Downloading a globaltag should be done in the new format creating sqlite
132  database files
133  """
134 
135  payloadfilter = ItemFilter(args)
136 
137  if db is None:
138  args.add_argument("tag", metavar="TAGNAME", default="production",
139  help="globaltag to download")
140  args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
141  help="directory to put the payloads into (default: %(default)s)")
142  args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
143  help="if given save information about all payloads in DIR/database.txt")
144  payloadfilter.add_arguments("payloads")
145  args.add_argument("-j", type=int, default=1, dest="nprocess",
146  help="Number of concurrent connections to use for file "
147  "download (default: %(default)s)")
148  args.add_argument("--retries", type=int, default=3,
149  help="Number of retries on connection problems (default: "
150  "%(default)s)")
151  group = args.add_mutually_exclusive_group()
152  group.add_argument("--tag-pattern", default=False, action="store_true",
153  help="if given, all globaltags which match the shell-style "
154  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
155  "``?`` stands for a single character. "
156  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
157  group.add_argument("--tag-regex", default=False, action="store_true",
158  help="if given, all globaltags matching the regular "
159  "expression given by TAGNAME will be downloaded (see "
160  "https://docs.python.org/3/library/re.html). "
161  "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
162  return
163 
164  try:
165  os.makedirs(args.destination, exist_ok=True)
166  except OSError:
167  B2ERROR("cannot create destination directory", file=sys.stderr)
168  return 1
169 
170  if not payloadfilter.check_arguments():
171  return 1
172 
173  # modify logging to remove the useless module: lines
174  for level in LogLevel.values.values():
175  logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
176 
177  tagnames = [args.tag]
178 
179  if args.tag_pattern or args.tag_regex:
180  tagnames = get_tagnames(db, tagnames, args.tag_regex)
181 
182  failed = 0
183  for tagname in sorted(tagnames):
184  try:
185  req = db.request("GET", "/globalTag/{}/globalTagPayloads".format(encode_name(tagname)),
186  "Downloading list of payloads for {} tag{}".format(tagname, payloadfilter))
187  except ConditionsDB.RequestError as e:
188  B2ERROR(str(e))
189  continue
190 
191  download_list = {}
192  for payload in req.json():
193  name = payload["payloadId"]["basf2Module"]["name"]
194  if payloadfilter.check(name):
195  local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload)
196  if local_file in download_list:
197  download_list[local_file][-1] += iovlist
198  else:
199  download_list[local_file] = [local_file, remote_file, checksum, iovlist]
200 
201  # do the downloading
202  full_iovlist = []
203  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
204  for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
205  if iovlist is None:
206  failed += 1
207  continue
208 
209  full_iovlist += iovlist
210 
211  if args.create_dbfile:
212  dbfile = []
213  for iov in sorted(full_iovlist):
214  dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
215  dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
216  with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
217  txtfile.writelines(dbfile)
218 
219  if failed > 0:
220  B2ERROR("{} out of {} payloads could not be downloaded".format(failed, len(download_list)))
221  return 1
222 
223 
224 def command_download(args, db=None):
225  """
226  Download one or more payloads into a sqlite database for local use
227 
228  This command allows to download the information from one or more globaltags
229  from the central database to be used locally.
230 
231  The command requires at least one tag name to download. It will check for
232  existing payloads in the output directory and only download payloads which
233  are not present or don't have the excpeted checksum.
234 
235  By default this script will create a local directory called ``conditions/``
236  which contains a ``metadata.sqlite`` with all the payload information of all
237  selected globaltags and sub directories containing all the payload files.
238 
239  This can be changed by specifying a different name for the metadata file
240  using the ``-o`` argument but the payloads will always be saved in sub
241  directoies in the same directory as the sqlite file.
242 
243  .. versionchanged:: release-04-00-00
244 
245  Previously this command was primarily intended to download payloads for
246  one globaltag and optionally create a text file with payload information
247  as well as download all necessary file. This has been changed and will
248  now create a sqlite file containing the payload metadata. If you need the
249  old behavior please use the command ``b2conditionsdb legacydownload``
250  """
251 
252  if db is None:
253  args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
254  args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
255  help="Name of the database file to create (default: %(default)s)")
256  args.add_argument("-f", "--force", action="store_true", default=False,
257  help="Don't ask permission if the output database file exists")
258  args.add_argument("--append", action="store_true", default=False,
259  help="Append to the existing database file if possible. "
260  "Otherwise the content in the database file will be overwritten")
261  group = args.add_mutually_exclusive_group()
262  group.add_argument("--no-download", action="store_true", default=False,
263  help="Don't download any payloads, just fetch the metadata information")
264  group.add_argument("--only-download", action="store_true", default=False,
265  help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
266  args.add_argument("--delete-extra-payloads", default=False, action="store_true",
267  help="if given this script will delete all extra files "
268  "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
269  "if they are not referenced in the database file.")
270  args.add_argument("--ignore-missing", action="store_true", default=False,
271  help="Ignore missing globaltags and download all other tags")
272  args.add_argument("-j", type=int, default=1, dest="nprocess",
273  help="Number of concurrent connections to use for file "
274  "download (default: %(default)s)")
275  args.add_argument("--retries", type=int, default=3,
276  help="Number of retries on connection problems (default: "
277  "%(default)s)")
278  group = args.add_mutually_exclusive_group()
279  group.add_argument("--tag-pattern", default=False, action="store_true",
280  help="if given, all globaltags which match the shell-style "
281  "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
282  "``?`` stands for a single character. ")
283  group.add_argument("--tag-regex", default=False, action="store_true",
284  help="if given, all globaltags matching the regular "
285  "expression given by TAGNAME will be downloaded (see "
286  "https://docs.python.org/3/library/re.html). ")
287  return
288 
289  # if we only download we need no tags, but otherwise check the tag list
290  if not args.only_download:
291  if args.tag_regex or args.tag_pattern:
292  args.tag = get_tagnames(db, args.tag, args.tag_regex)
293 
294  if not args.tag:
295  B2ERROR("No tags selected, cannot continue")
296  return 1
297 
298  def get_taginfo(tagname):
299  """return the important information about all our globaltags"""
300  tag_info = db.get_globalTagInfo(tagname)
301  if not tag_info:
302  B2ERROR(f"Cannot find globaltag {tagname}")
303  return None
304  return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
305 
306  # so lets get info on all our tags and check if soem are missing ...
307  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
308  tags = list(pool.map(get_taginfo, args.tag))
309 
310  if not args.ignore_missing and None in tags:
311  return 1
312  # ok, remove tags that didn't exist ... and print the final list
313  tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
314  taglist = ["Selected globaltags:"]
315  taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
316  initial_indent=" "*4, subsequent_indent=" "*4)
317  B2INFO('\n'.join(taglist))
318 
319  # ok, we either download something or need to modify the db file, make sure
320  # the output directory exists ...
321  destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
322  try:
323  os.makedirs(destination, exist_ok=True)
324  except OSError as e:
325  B2ERROR(f"cannot create output directory, {e}")
326  return 1
327 
328  if not os.path.exists(args.dbfile):
329  # no file? no append!
330  args.append = False
331  elif not args.force and not args.only_download:
332  # but if it exists ask the user ...
333  query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
334  if query.lower().strip() not in ['y', 'yes']:
335  B2ERROR("Output file exists, cannot continue")
336  return 1
337 
338  try:
339  # if we only download we can open readonly
340  mode = "read" if args.only_download else ("append" if args.append else "overwrite")
341  database = LocalMetadataProvider(args.dbfile, mode)
342  # we we only download we don't need to fix the schema but should make sure there's actually something in it
343  if args.only_download:
344  if database.get_payload_count() == 0:
345  return 0
346 
347  except Exception as e:
348  B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
349  return 1
350 
351  # we know the tags, we have a database file ... lets get the metadata
352  with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
353  if not args.only_download:
354  # loop over all tags with their iovs being downloaded in parallel
355  for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
356  B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
357  database.add_globaltag(tag_id, tag_name, tag_state, iovs)
358 
359  # and finally download all necessary payloads for this file
360  if args.no_download:
361  return 0
362 
363  # make sure all the payloads referenced in the file are present
364  downloader = functools.partial(download_payload, db, directory=destination)
365  all_payloads = set(pool.map(downloader, database.get_payloads()))
366 
367  if args.delete_extra_payloads:
368  existing_files = set()
369  for dirname, subdirs, filenames in os.walk(destination):
370  # only look in sub directories matching a hex substring
371  subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
372  # and don't check files in top dir
373  if dirname == destination:
374  continue
375  # and add all others
376  for filename in filenames:
377  if not re.match(r"(.+)_r(\d+).root", filename):
378  continue
379  existing_files.add(os.path.join(dirname, filename))
380 
381  extra_files = existing_files - all_payloads
382  B2INFO(f"Deleting {len(extra_files)} additional payload files")
383  # delete all the files and consume the results to trigger any errors
384  list(pool.map(os.remove, extra_files))
385 
386  return 1 if None in all_payloads else 0
basf2.utils
Definition: utils.py:1