Belle II Software development
cli_download.py
1#!/usr/bin/env python3
2
3
10
11"""
12Script to download the contents of a globaltag of the central database.
13
14This allows to use the payloads as a local payload directory or use it as a
15local database when running basf2.
16"""
17
18import sys
19import os
20import requests
21import shutil
22import fnmatch
23import re
24import functools
25import textwrap
26from urllib.parse import urljoin
27from . import ConditionsDB, encode_name, file_checksum
28from .cli_utils import ItemFilter
29from .iov import IntervalOfValidity
30from .local_metadata import LocalMetadataProvider
31from basf2 import B2ERROR, B2WARNING, B2INFO, LogLevel, LogInfo, logging
32from basf2.utils import get_terminal_width
33from concurrent.futures import ThreadPoolExecutor
34
35
36def check_payload(destination, payloadinfo, run_range=None):
37 """Return a list of all iovs for a given payload together with the file checksum and filenames.
38
39 Args:
40 destination (str): local folder where to download the payload
41 payloadinfo (dict): pyload information as returned by the REST API
42 run_range (b2conditions_db.iov.IntervalOfValidity, optional): Interval of validity . Defaults to None.
43
44 Returns:
45 tuple: local file name, remote file name, checksum, list of iovs
46 """
47
48 payload = payloadinfo["payloadId"]
49 module = payload["basf2Module"]["name"]
50 revision = int(payload["revision"])
51 checksum = payload["checksum"]
52
53 url = payload["payloadUrl"]
54 base = payload["baseUrl"]
55 local_file = os.path.join(destination, os.path.basename(url))
56 remote_file = urljoin(base + "/", url)
57
58 iovlist = []
59 for iov in payloadinfo["payloadIovs"]:
60 if run_range is not None:
61 if (
62 IntervalOfValidity(
63 iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]
64 ).intersect(run_range)
65 is None
66 ):
67 continue
68 iovlist.append([module, revision, iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]])
69
70 return (local_file, remote_file, checksum, iovlist)
71
72
73def download_file(db, local_file, remote_file, checksum, iovlist=None):
74 """Actually download the file"""
75 # check if existing
76 if os.path.exists(local_file):
77 if file_checksum(local_file) == checksum:
78 # done, nothing else to do
79 return iovlist
80 else:
81 B2WARNING(f"Checksum mismatch for {local_file}, downloading again")
82
83 # download the file
84 B2INFO(f"download {local_file}")
85 with open(local_file, "wb") as out:
86 file_req = db._session.get(remote_file, stream=True)
87 if file_req.status_code != requests.codes.ok:
88 B2ERROR(f"Error downloading {file_req.url}: {file_req.status_code}")
89 return None
90 shutil.copyfileobj(file_req.raw, out)
91
92 # and check it
93 if file_checksum(local_file) != checksum:
94 B2ERROR(f"Checksum mismatch after download: {local_file}")
95 return None
96
97 return iovlist
98
99
100def download_payload(db, payload, directory):
101 """Download a payload given a PayloadInformation object"""
102 remote = urljoin(payload.base_url, payload.payload_url)
103 local = os.path.join(directory, payload.checksum[:2], f"{payload.name}_r{payload.revision}.root")
104 try:
105 os.makedirs(os.path.dirname(local), exist_ok=True)
106 except OSError as e:
107 B2ERROR(f"Cannot download payload: {e}")
108 return None
109 return download_file(db, local, remote, payload.checksum, iovlist=local)
110
111
112def get_tagnames(db, patterns, use_regex=False):
113 """Return a list of tags matching all patterns"""
114 all_tags = db.get_globalTags()
115 final = set()
116 for tag in patterns:
117 if not use_regex:
118 tagnames = fnmatch.filter(all_tags, tag)
119 else:
120 try:
121 tagname_regex = re.compile(tag, re.IGNORECASE)
122 except Exception as e:
123 B2ERROR(f"--tag-regex: '{tag}' is not a valid regular expression: {e}")
124 return False
125 tagnames = (e for e in all_tags if tagname_regex.search(e))
126
127 final |= set(tagnames)
128 return list(final)
129
130
131def command_legacydownload(args, db=None):
132 """
133 Download a globaltag from the database
134
135 This command allows to download a globaltag from the central database to be
136 used locally, either as lookup directory for payloads or as a standalone
137 local database if --create-dbfile is specified.
138
139 The command requires the TAGNAME to download and optionally an output
140 directory which defaults to centraldb in the local working directory. It
141 will check for existing payloads in the output directory and only download
142 payloads which are not present or don't have the expected checksum.
143
144 One can filter the payloads to be downloaded by payload name using the
145 --filter, --exclude and --regex options.
146
147 .. note:: Version added: release-04-00-00
148
149 This has been renamed from ``download`` and is kept for compatibility
150
151 .. warning::
152
153 Downloading a globaltag should be done in the new format creating sqlite
154 database files. Please use this legacy tool only for downloading "small"
155 globaltags or very few payloads.
156 """
157
158 payloadfilter = ItemFilter(args)
159
160 if db is None:
161 args.add_argument("tag", metavar="TAGNAME", default="production",
162 help="globaltag to download")
163 args.add_argument("destination", nargs='?', metavar="DIR", default="centraldb",
164 help="directory to put the payloads into (default: %(default)s)")
165 args.add_argument("-c", "--create-dbfile", default=False, action="store_true",
166 help="if given save information about all payloads in DIR/database.txt")
167 payloadfilter.add_arguments("payloads")
168 args.add_argument("-j", type=int, default=1, dest="nprocess",
169 help="Number of concurrent connections to use for file "
170 "download (default: %(default)s)")
171 args.add_argument("--retries", type=int, default=3,
172 help="Number of retries on connection problems (default: "
173 "%(default)s)")
174 args.add_argument("--run-range", nargs=4, default=None, type=int,
175 metavar=("FIRST_EXP", "FIRST_RUN", "FINAL_EXP", "FINAL_RUN"),
176 help="Can be four numbers to limit the run range to be downloaded"
177 "Only iovs overlapping, even partially, with this range will be downloaded.")
178 group = args.add_mutually_exclusive_group()
179 group.add_argument("--tag-pattern", default=False, action="store_true",
180 help="if given, all globaltags which match the shell-style "
181 "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
182 "``?`` stands for a single character. "
183 "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
184 group.add_argument("--tag-regex", default=False, action="store_true",
185 help="if given, all globaltags matching the regular "
186 "expression given by TAGNAME will be downloaded (see "
187 "https://docs.python.org/3/library/re.html). "
188 "If -c is given as well the database files will be ``DIR/TAGNAME.txt``")
189 return
190
191 try:
192 os.makedirs(args.destination, exist_ok=True)
193 except OSError:
194 B2ERROR("cannot create destination directory", file=sys.stderr)
195 return 1
196
197 if not payloadfilter.check_arguments():
198 return 1
199
200 run_range_str = f' valid in {tuple(args.run_range)}' if args.run_range else ''
201 args.run_range = IntervalOfValidity(args.run_range) if args.run_range else None
202
203 # modify logging to remove the useless module: lines
204 for level in LogLevel.values.values():
205 logging.set_info(level, LogInfo.LEVEL | LogInfo.MESSAGE | LogInfo.TIMESTAMP)
206
207 tagnames = [args.tag]
208
209 if args.tag_pattern or args.tag_regex:
210 tagnames = get_tagnames(db, tagnames, args.tag_regex)
211
212 failed = 0
213 for tagname in sorted(tagnames):
214 try:
215 req = db.request("GET", f"/globalTag/{encode_name(tagname)}/globalTagPayloads",
216 f"Downloading list of payloads for {tagname} tag{payloadfilter}{run_range_str}")
217 except ConditionsDB.RequestError as e:
218 B2ERROR(str(e))
219 continue
220
221 download_list = {}
222 for payload in req.json():
223 name = payload["payloadId"]["basf2Module"]["name"]
224 if payloadfilter.check(name):
225 local_file, remote_file, checksum, iovlist = check_payload(args.destination, payload, args.run_range)
226 if iovlist:
227 if local_file in download_list:
228 download_list[local_file][-1] += iovlist
229 else:
230 download_list[local_file] = [local_file, remote_file, checksum, iovlist]
231
232 # do the downloading
233 full_iovlist = []
234 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
235 for iovlist in pool.map(lambda x: download_file(db, *x), download_list.values()):
236 if iovlist is None:
237 failed += 1
238 continue
239
240 full_iovlist += iovlist
241
242 if args.create_dbfile:
243 dbfile = []
244 for iov in sorted(full_iovlist):
245 dbfile.append("dbstore/{} {} {},{},{},{}\n".format(*iov))
246 dbfilename = tagname if (args.tag_pattern or args.tag_regex) else "database"
247 with open(os.path.join(args.destination, dbfilename + ".txt"), "w") as txtfile:
248 txtfile.writelines(dbfile)
249
250 if failed > 0:
251 B2ERROR(f"{failed} out of {len(download_list)} payloads could not be downloaded")
252 return 1
253
254
255def command_download(args, db=None):
256 """
257 Download one or more payloads into a sqlite database for local use
258
259 This command allows to download the information from one or more globaltags
260 from the central database to be used locally.
261
262 The command requires at least one tag name to download. It will check for
263 existing payloads in the output directory and only download payloads which
264 are not present or don't have the expected checksum.
265
266 By default this script will create a local directory called ``conditions/``
267 which contains a ``metadata.sqlite`` with all the payload information of all
268 selected globaltags and sub directories containing all the payload files.
269
270 This can be changed by specifying a different name for the metadata file
271 using the ``-o`` argument but the payloads will always be saved in sub
272 directories in the same directory as the sqlite file.
273
274 .. note:: Version changed: release-04-00-00
275
276 Previously this command was primarily intended to download payloads for
277 one globaltag and optionally create a text file with payload information
278 as well as download all necessary file. This has been changed and will
279 now create a sqlite file containing the payload metadata. If you need the
280 old behavior please use the command ``b2conditionsdb-legacydownload``
281 """
282
283 if db is None:
284 args.add_argument("tag", nargs="*", metavar="TAGNAME", help="globaltag to download")
285 args.add_argument("-o", "--dbfile", metavar="DATABASEFILE", default="conditions/metadata.sqlite",
286 help="Name of the database file to create (default: %(default)s)")
287 args.add_argument("-f", "--force", action="store_true", default=False,
288 help="Don't ask permission if the output database file exists")
289 args.add_argument("--append", action="store_true", default=False,
290 help="Append to the existing database file if possible. "
291 "Otherwise the content in the database file will be overwritten")
292 group = args.add_mutually_exclusive_group()
293 group.add_argument("--no-download", action="store_true", default=False,
294 help="Don't download any payloads, just fetch the metadata information")
295 group.add_argument("--only-download", action="store_true", default=False,
296 help="Assume the metadata file is already filled, just make sure all payloads are downloaded")
297 args.add_argument("--delete-extra-payloads", default=False, action="store_true",
298 help="if given this script will delete all extra files "
299 "that follow the payload naming convention ``AB/{name}_r{revision}.root`` "
300 "if they are not referenced in the database file.")
301 args.add_argument("--ignore-missing", action="store_true", default=False,
302 help="Ignore missing globaltags and download all other tags")
303 args.add_argument("-j", type=int, default=1, dest="nprocess",
304 help="Number of concurrent connections to use for file "
305 "download (default: %(default)s)")
306 args.add_argument("--retries", type=int, default=3,
307 help="Number of retries on connection problems (default: "
308 "%(default)s)")
309 group = args.add_mutually_exclusive_group()
310 group.add_argument("--tag-pattern", default=False, action="store_true",
311 help="if given, all globaltags which match the shell-style "
312 "pattern TAGNAME will be downloaded: ``*`` stands for anything, "
313 "``?`` stands for a single character. ")
314 group.add_argument("--tag-regex", default=False, action="store_true",
315 help="if given, all globaltags matching the regular "
316 "expression given by TAGNAME will be downloaded (see "
317 "https://docs.python.org/3/library/re.html). ")
318 return
319
320 # if we only download we need no tags, but otherwise check the tag list
321 if not args.only_download:
322 if args.tag_regex or args.tag_pattern:
323 args.tag = get_tagnames(db, args.tag, args.tag_regex)
324
325 if not args.tag:
326 B2ERROR("No tags selected, cannot continue")
327 return 1
328
329 def get_taginfo(tagname):
330 """return the important information about all our globaltags"""
331 tag_info = db.get_globalTagInfo(tagname)
332 if not tag_info:
333 B2ERROR(f"Cannot find globaltag {tagname}")
334 return None
335 return tag_info['globalTagId'], tag_info['name'], tag_info['globalTagStatus']['name']
336
337 # so lets get info on all our tags and check if some are missing ...
338 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
339 tags = list(pool.map(get_taginfo, args.tag))
340
341 if not args.ignore_missing and None in tags:
342 return 1
343 # ok, remove tags that didn't exist ... and print the final list
344 tags = sorted((e for e in tags if e is not None), key=lambda tag: tag[1])
345 taglist = ["Selected globaltags:"]
346 taglist += textwrap.wrap(", ".join(tag[1] for tag in tags), width=get_terminal_width(),
347 initial_indent=" "*4, subsequent_indent=" "*4)
348 B2INFO('\n'.join(taglist))
349
350 # ok, we either download something or need to modify the db file, make sure
351 # the output directory exists ...
352 destination = os.path.relpath(os.path.dirname(os.path.abspath(args.dbfile)))
353 try:
354 os.makedirs(destination, exist_ok=True)
355 except OSError as e:
356 B2ERROR(f"cannot create output directory, {e}")
357 return 1
358
359 if not os.path.exists(args.dbfile):
360 # no file? no append!
361 args.append = False
362 elif not args.force and not args.only_download:
363 # but if it exists ask the user ...
364 query = input(f"Database file {args.dbfile} exists, " + ("overwrite" if not args.append else "append") + " (y/n) [n]? ")
365 if query.lower().strip() not in ['y', 'yes']:
366 B2ERROR("Output file exists, cannot continue")
367 return 1
368
369 try:
370 # if we only download we can open readonly
371 mode = "read" if args.only_download else ("append" if args.append else "overwrite")
372 database = LocalMetadataProvider(args.dbfile, mode)
373 # we we only download we don't need to fix the schema but should make sure there's actually something in it
374 if args.only_download:
375 if database.get_payload_count() == 0:
376 return 0
377
378 except Exception as e:
379 B2ERROR(f"Cannot open output file {args.dbfile}: {e}")
380 return 1
381
382 # we know the tags, we have a database file ... lets get the metadata
383 with ThreadPoolExecutor(max_workers=args.nprocess) as pool:
384 if not args.only_download:
385 # loop over all tags with their iovs being downloaded in parallel
386 for tag_id, tag_name, tag_state, iovs in pool.map(lambda x: x + (db.get_all_iovs(x[1]),), tags):
387 B2INFO(f"Adding metadata for {tag_name} to {args.dbfile}")
388 database.add_globaltag(tag_id, tag_name, tag_state, iovs)
389
390 # and finally download all necessary payloads for this file
391 if args.no_download:
392 return 0
393
394 # make sure all the payloads referenced in the file are present
395 downloader = functools.partial(download_payload, db, directory=destination)
396 all_payloads = set(pool.map(downloader, database.get_payloads()))
397
398 if args.delete_extra_payloads:
399 existing_files = set()
400 for dirname, subdirs, filenames in os.walk(destination):
401 # only look in sub directories matching a hex substring
402 subdirs[:] = (e for e in subdirs if re.match('[0-9a-f]{2}', e))
403 # and don't check files in top dir
404 if dirname == destination:
405 continue
406 # and add all others
407 for filename in filenames:
408 if not re.match(r"(.+)_r(\d+).root", filename):
409 continue
410 existing_files.add(os.path.join(dirname, filename))
411
412 extra_files = existing_files - all_payloads
413 B2INFO(f"Deleting {len(extra_files)} additional payload files")
414 # delete all the files and consume the results to trigger any errors
415 list(pool.map(os.remove, extra_files))
416
417 return 1 if None in all_payloads else 0