Belle II Software development
cli_download.py
1#!/usr/bin/env python3
2
3
10
11"""
12Script to download the contents of a globaltag of the central database.
13
14This allows to use the payloads as a local payload directory or use it as a
15local database when running basf2.
16"""
17
18import sys
19import os
20import requests
21import shutil
22import fnmatch
23import re
24import functools
25import textwrap
26from urllib.parse import urljoin
27from . import ConditionsDB, encode_name, file_checksum
28from .cli_utils import ItemFilter
29from .iov import IntervalOfValidity
30from .local_metadata import LocalMetadataProvider
31from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
32from basf2.utils import get_terminal_width
33from concurrent.futures import ThreadPoolExecutor
34
35
36def check_payload(destination, payloadinfo, run_range=None):
37 """Return a list of all iovs for a given payload together with the file checksum and filenames.
38
39 Args:
40 destination (str): local folder where to download the payload
41 payloadinfo (dict): pyload informations as returned by the REST API
42 run_range (b2conditions_db.iov.IntervalOfValidity, optional): Interval of validity . Defaults to None.
43
44 Returns:
45 tuple: local file name, remote file name, checksum, list of iovs
46 """
47
48 payload = payloadinfo["payloadId"]
49 module = payload["basf2Module"]["name"]
50 revision = int(payload["revision"])
51 checksum = payload["checksum"]
52
53 url = payload["payloadUrl"]
54 base = payload["baseUrl"]
55 local_file = os.path.join(destination, os.path.basename(url))
56 remote_file = urljoin(base + "/", url)
57
58 iovlist = []
59 for iov in payloadinfo["payloadIovs"]:
60 if run_range is not None:
61 if (
62 IntervalOfValidity(
63 iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]
64 ).intersect(run_range)
65 is None
66 ):
67 continue
68 iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
69
70 return (local_file, remote_file, checksum, iovlist)
71
72
73def download_file(db, local_file, remote_file, checksum, iovlist=None):
74 """Actually download the file"""
75 # check if existing
76 if os.path.exists(local_file):
77 if file_checksum(local_file) == checksum:
78 # done, nothing else to do
79 return iovlist
80 else:
81 B2WARNING(f"Checksum mismatch for {local_file}, downloading again")
82
83 # download the file
84 B2INFO(f"download {local_file}")
85 with open(local_file, "wb") as out:
86 file_req = db._session.get(remote_file, stream=True)
87 if file_req.status_code != requests.codes.ok:
88 B2ERROR(f"Error downloading {file_req.url}: {file_req.status_code}")
89 return None
90 shutil.copyfileobj(file_req.raw, out)
91
92 # and check it
93 if file_checksum(local_file) != checksum:
94 B2ERROR(f"Checksum mismatch after download: {local_file}")
95 return None
96
97 return iovlist
98
99
100def download_payload(db, payload, directory):
101 """Download a payload given a PayloadInformation object"""
102 remote = urljoin(payload.base_url, payload.payload_url)
103 local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
104 try:
105 os.makedirs(os.path.dirname(local), exist_ok=True)
106 except OSError as e:
107 B2ERROR(f"Cannot download payload: {e}")
108 return None
109 return download_file(db, local, remote, payload.checksum, iovlist=local)
110
111
112def get_tagnames(db, patterns, use_regex=False):
113 """Return a list of tags matching all patterns"""
114 all_tags = db.get_globalTags()
115 final = set()
116 for tag in patterns:
117 if not use_regex:
118 tagnames = fnmatch.filter(all_tags, tag)
119 else:
120 try:
121 tagname_regex = re.compile(tag, re.IGNORECASE)
122 except Exception as e:
123 B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
124 return False
125 tagnames = (e for e in all_tags if tagname_regex.search(e))
126
127 final |= set(tagnames)
128 return list(final)
129
130
131def command_legacydownload(args, db=None):
132 """
133 Download a globaltag from the database
134
135 This command allows to download a globaltag from the central database to be
136 used locally, either as lookup directory for payloads or as a standalone
137 local database if --create-dbfile is specified.
138
139 The command requires the TAGNAME to download and optionally an output
140 directory which defaults to centraldb in the local working directory. It
141 will check for existing payloads in the output directory and only download
142 payloads which are not present or don't have the expected checksum.
143 One can filter the payloads to be downloaded by payload name using the
144 --filter, --exclude and --regex options.
145
146 .. versionadded:: release-04-00-00
147
148 This has been renamed from ``download`` and is kept for compatibility
149
150 .. warning::
151
152 Downloading a globaltag should be done in the new format creating sqlite
153 database files. Please use this legacy tool only for downloading "small"
154 globaltags or very few payloads.
155 """
156
157 payloadfilter = ItemFilter(args)
158
159 if db is None:
160 args.add_argument("tag", metavar="TAGNAME", default="production",
161 help="globaltag to download")
162 args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
163 help="directory to put the payloads into (default: %(default)s)")
164 args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
165 help="if given save information about all payloads in DIR/database.txt")
166 payloadfilter.add_arguments("payloads")
167 args.add_argument("-j", type=int, default=1, dest="nprocess",
168 help="Number of concurrent connections to use for file "
169 "download (default: %(default)s)")
170 args.add_argument("--retries", type=int, default=3,
171 help="Number of retries on connection problems (default: "
172 "%(default)s)")
173 args.add_argument("--run-range", nargs=4, default=None, type=int,
174 metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
175 help="Can be four numbers to limit the run range to be downloaded"
176 "Only iovs overlapping, even partially, with this range will be downloaded.")
177 group = args.add_mutually_exclusive_group()
178 group.add_argument("--tag-pattern", default=False, action="store_true",
179 help="if given, all globaltags which match the shell-style "
180 "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
181 "``?`` stands for a single character. "
182 "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
183 group.add_argument("--tag-regex", default=False, action="store_true",
184 help="if given, all globaltags matching the regular "
185 "expression given by TAGNAME will be downloaded (see "
186 "https://docs.python.org/3/library/re.html). "
187 "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
188 return
189
190 try:
191 os.makedirs(args.destination, exist_ok=True)
192 except OSError:
193 B2ERROR("cannot create destination directory", file=sys.stderr)
194 return 1
195
196 if not payloadfilter.check_arguments():
197 return 1
198
199 run_range_str = f' valid in {tuple(args.run_range)}' if args.run_range else ''
200 args.run_range = IntervalOfValidity(args.run_range) if args.run_range else None
201
202 # modify logging to remove the useless module: lines
203 for level in LogLevel.values.values():
204 logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
205
206 tagnames = [args.tag]
207
208 if args.tag_pattern or args.tag_regex:
209 tagnames = get_tagnames(db, tagnames, args.tag_regex)
210
211 failed = 0
212 for tagname in sorted(tagnames):
213 try:
214 req = db.request("GET", f"/globalTag/{encode_name(tagname)}/globalTagPayloads",
215 f"Downloading list of payloads for {tagname} tag{payloadfilter}{run_range_str}")
216 except ConditionsDB.RequestError as e:
217 B2ERROR(str(e))
218 continue
219
220 download_list = {}
221 for payload in req.json():
222 name = payload["payloadId"]["basf2Module"]["name"]
223 if payloadfilter.check(name):
224 local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload, args.run_range)
225 if iovlist:
226 if local_file in download_list:
227 download_list[local_file][-1] += iovlist
228 else:
229 download_list[local_file] = [local_file, remote_file, checksum, iovlist]
230
231 # do the downloading
232 full_iovlist = []
233 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
234 for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
235 if iovlist is None:
236 failed += 1
237 continue
238
239 full_iovlist += iovlist
240
241 if args.create_dbfile:
242 dbfile = []
243 for iov in sorted(full_iovlist):
244 dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
245 dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
246 with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
247 txtfile.writelines(dbfile)
248
249 if failed > 0:
250 B2ERROR(f"{failed} out of {len(download_list)} payloads could not be downloaded")
251 return 1
252
253
254def command_download(args, db=None):
255 """
256 Download one or more payloads into a sqlite database for local use
257
258 This command allows to download the information from one or more globaltags
259 from the central database to be used locally.
260
261 The command requires at least one tag name to download. It will check for
262 existing payloads in the output directory and only download payloads which
263 are not present or don't have the expected checksum.
264
265 By default this script will create a local directory called ``conditions/``
266 which contains a ``metadata.sqlite`` with all the payload information of all
267 selected globaltags and sub directories containing all the payload files.
268
269 This can be changed by specifying a different name for the metadata file
270 using the ``-o`` argument but the payloads will always be saved in sub
271 directories in the same directory as the sqlite file.
272
273 .. versionchanged:: release-04-00-00
274
275 Previously this command was primarily intended to download payloads for
276 one globaltag and optionally create a text file with payload information
277 as well as download all necessary file. This has been changed and will
278 now create a sqlite file containing the payload metadata. If you need the
279 old behavior please use the command ``b2conditionsdb-legacydownload``
280 """
281
282 if db is None:
283 args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
284 args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
285 help="Name of the database file to create (default: %(default)s)")
286 args.add_argument("-f", "--force", action="store_true", default=False,
287 help="Don't ask permission if the output database file exists")
288 args.add_argument("--append", action="store_true", default=False,
289 help="Append to the existing database file if possible. "
290 "Otherwise the content in the database file will be overwritten")
291 group = args.add_mutually_exclusive_group()
292 group.add_argument("--no-download", action="store_true", default=False,
293 help="Don't download any payloads, just fetch the metadata information")
294 group.add_argument("--only-download", action="store_true", default=False,
295 help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
296 args.add_argument("--delete-extra-payloads", default=False, action="store_true",
297 help="if given this script will delete all extra files "
298 "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
299 "if they are not referenced in the database file.")
300 args.add_argument("--ignore-missing", action="store_true", default=False,
301 help="Ignore missing globaltags and download all other tags")
302 args.add_argument("-j", type=int, default=1, dest="nprocess",
303 help="Number of concurrent connections to use for file "
304 "download (default: %(default)s)")
305 args.add_argument("--retries", type=int, default=3,
306 help="Number of retries on connection problems (default: "
307 "%(default)s)")
308 group = args.add_mutually_exclusive_group()
309 group.add_argument("--tag-pattern", default=False, action="store_true",
310 help="if given, all globaltags which match the shell-style "
311 "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
312 "``?`` stands for a single character. ")
313 group.add_argument("--tag-regex", default=False, action="store_true",
314 help="if given, all globaltags matching the regular "
315 "expression given by TAGNAME will be downloaded (see "
316 "https://docs.python.org/3/library/re.html). ")
317 return
318
319 # if we only download we need no tags, but otherwise check the tag list
320 if not args.only_download:
321 if args.tag_regex or args.tag_pattern:
322 args.tag = get_tagnames(db, args.tag, args.tag_regex)
323
324 if not args.tag:
325 B2ERROR("No tags selected, cannot continue")
326 return 1
327
328 def get_taginfo(tagname):
329 """return the important information about all our globaltags"""
330 tag_info = db.get_globalTagInfo(tagname)
331 if not tag_info:
332 B2ERROR(f"Cannot find globaltag {tagname}")
333 return None
334 return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
335
336 # so lets get info on all our tags and check if soem are missing ...
337 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
338 tags = list(pool.map(get_taginfo, args.tag))
339
340 if not args.ignore_missing and None in tags:
341 return 1
342 # ok, remove tags that didn't exist ... and print the final list
343 tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
344 taglist = ["Selected globaltags:"]
345 taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
346 initial_indent=" "*4, subsequent_indent=" "*4)
347 B2INFO('\n'.join(taglist))
348
349 # ok, we either download something or need to modify the db file, make sure
350 # the output directory exists ...
351 destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
352 try:
353 os.makedirs(destination, exist_ok=True)
354 except OSError as e:
355 B2ERROR(f"cannot create output directory, {e}")
356 return 1
357
358 if not os.path.exists(args.dbfile):
359 # no file? no append!
360 args.append = False
361 elif not args.force and not args.only_download:
362 # but if it exists ask the user ...
363 query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
364 if query.lower().strip() not in ['y', 'yes']:
365 B2ERROR("Output file exists, cannot continue")
366 return 1
367
368 try:
369 # if we only download we can open readonly
370 mode = "read" if args.only_download else ("append" if args.append else "overwrite")
371 database = LocalMetadataProvider(args.dbfile, mode)
372 # we we only download we don't need to fix the schema but should make sure there's actually something in it
373 if args.only_download:
374 if database.get_payload_count() == 0:
375 return 0
376
377 except Exception as e:
378 B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
379 return 1
380
381 # we know the tags, we have a database file ... lets get the metadata
382 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
383 if not args.only_download:
384 # loop over all tags with their iovs being downloaded in parallel
385 for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
386 B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
387 database.add_globaltag(tag_id, tag_name, tag_state, iovs)
388
389 # and finally download all necessary payloads for this file
390 if args.no_download:
391 return 0
392
393 # make sure all the payloads referenced in the file are present
394 downloader = functools.partial(download_payload, db, directory=destination)
395 all_payloads = set(pool.map(downloader, database.get_payloads()))
396
397 if args.delete_extra_payloads:
398 existing_files = set()
399 for dirname, subdirs, filenames in os.walk(destination):
400 # only look in sub directories matching a hex substring
401 subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
402 # and don't check files in top dir
403 if dirname == destination:
404 continue
405 # and add all others
406 for filename in filenames:
407 if not re.match(r"(.+)_r(\d+).root", filename):
408 continue
409 existing_files.add(os.path.join(dirname, filename))
410
411 extra_files = existing_files - all_payloads
412 B2INFO(f"Deleting {len(extra_files)} additional payload files")
413 # delete all the files and consume the results to trigger any errors
414 list(pool.map(os.remove, extra_files))
415
416 return 1 if None in all_payloads else 0
417