Belle II Software development
__init__.py
1#!/usr/bin/env python3
2
3
10
11"""
12conditions_db
13-------------
14
15Python interface to the ConditionsDB
16"""
17
18import os
19import stat
20from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING, conditions
21import requests
22from requests.packages.urllib3.fields import RequestField
23from requests.packages.urllib3.filepost import encode_multipart_formdata
24import json
25import urllib
26from versioning import upload_global_tag, jira_global_tag_v2
27from collections import defaultdict
28from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29import hashlib
30import itertools
31from typing import Union # noqa
32import getpass
33import tempfile
34from conditions_db.iov import IntervalOfValidity
35
36
37def encode_name(name):
38 """Escape name to be used in an url"""
39 return urllib.parse.quote(name, safe="")
40
41
42def file_checksum(filename):
43 """Calculate md5 hash of file"""
44 md5hash = hashlib.md5()
45 with open(filename, "rb") as data:
46 md5hash.update(data.read())
47 return md5hash.hexdigest()
48
49
50def chunks(container, chunk_size):
51 """Cut a container in chunks of max. chunk_size"""
52 it = iter(container)
53 while True:
54 chunk = tuple(itertools.islice(it, chunk_size))
55 if not chunk:
56 return
57 yield chunk
58
59
60def get_cdb_authentication_token(path=None):
61 """
62 Helper function for correctly retrieving the CDB authentication token (either via file or via issuing server).
63
64 :param path: Path to a file containing a CDB authentication token; if None, the function will use
65 a default path (``${HOME}/b2cdb_${BELLE2_USER}.token`` or ``$TMPFILE/b2cdb_${BELLE2_USER}.token``)
66 to look for a token.
67 """
68 # if we pass a path, let's use it for getting the token, otherwise use the default one
69 if path:
70 path_to_token = path
71 else:
72 path_to_token = os.path.join(os.getenv('HOME', tempfile.gettempdir()), f'b2cdb_{os.getenv("BELLE2_USER", None)}.token')
73
74 # check validity of existing token
75 if os.path.isfile(path_to_token):
76 with open(path_to_token) as token_file:
77 response = requests.get('https://token.belle2.org/check', verify=False, params={'token': token_file.read().strip()})
78 if response.status_code == 400:
79 B2INFO(f'The file {path_to_token} contains an invalid token, getting a new token...')
80 os.unlink(path_to_token)
81 elif response.status_code > 400:
82 B2WARNING("The validity of the existing token could not be checked. Trying to connect to CDB anyway.")
83
84 # request a token if there is none
85 if not os.path.isfile(path_to_token):
86 username = os.environ['BELLE2_USER']
87 othername = input(f'\nConfirm your DESY user name by pressing enter or type the correct one [{username}]: ')
88 if othername != '':
89 username = othername
90 password = getpass.getpass("Please type your DESY password to request a CDB token: ")
91 response = requests.get('https://token.belle2.org', verify=False, auth=(username, password))
92 print(response.content)
93 if response.status_code == requests.codes.ok:
94 with open(path_to_token, 'wb') as token_file:
95 os.chmod(path_to_token, stat.S_IRUSR | stat.S_IWUSR)
96 token_file.write(response.content)
97 else:
98 B2ERROR('Failed to get token')
99 return None
100
101 # read and return token
102 with open(path_to_token) as token_file:
103 return token_file.read().strip()
104
105
106def set_cdb_authentication_token(cdb_instance, auth_token=None):
107 """
108 Helper function for setting the CDB authentication token.
109
110 :param cdb_instance: An instance of the ``ConditionsDB`` class.
111 :param auth_token: A CDB authentication token: if ``None``, it is automatically retrieved from the issuing server
112 and then set
113 """
114 if auth_token is not None:
115 cdb_instance.set_authentication_token(auth_token)
116 else:
117 # If something goes wrong with the auth. token, the function returns None and the authentication will fail
118 token = get_cdb_authentication_token(os.getenv('BELLE2_CDB_AUTH_TOKEN', default=None))
119 cdb_instance.set_authentication_token(token)
120
121
122class BearerAuth(requests.auth.AuthBase):
123 """Simple class to present bearer token instead of username/password"""
124
125 def __init__(self, token):
126 """Construct from a token"""
127
128 self._authtoken = f"Bearer {token}"
129
130 def __call__(self, r):
131 """Update headers to include token"""
132 r.headers["X-Authorization"] = self._authtoken
133 return r
134
135
137 """Small container class to help compare payload information for efficient
138 comparison between globaltags"""
139
140 @classmethod
141 def from_json(cls, payload, iov=None):
142 """Set all internal members from the json information of the payload and the iov.
143
144 Arguments:
145 payload (dict): json information of the payload as returned by REST api
146 iov (dict): json information of the iov as returned by REST api
147 """
148 if iov is None:
149 iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
150
151 return cls(
152 payload['payloadId'],
153 payload['basf2Module']['name'],
154 payload['revision'],
155 payload['checksum'],
156 payload['payloadUrl'],
157 payload['baseUrl'],
158 iov['payloadIovId'],
159 (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
160 )
161
162 def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
163 """
164 Create a new object from the given information
165 """
166
167 self.name = name
168
169 self.checksum = checksum
170
171 self.iov = iov
172
173 self.revision = revision
174
175 self.payload_id = payload_id
176
177 self.iov_id = iov_id
178
179 self.base_url = base_url
180
181 self.payload_url = payload_url
182
183 @property
184 def url(self):
185 """Return the full url to the payload on the server"""
186 return urllib.parse.urljoin(self.base_url + '/', self.payload_url)
187
188 def __hash__(self):
189 """Make object hashable"""
190 return hash((self.name, self.checksum, self.iov))
191
192 def __eq__(self, other):
193 """Check if two payloads are equal"""
194 return (self.name, self.checksum, self.iov) == (other.name, other.checksum, other.iov)
195
196 def __lt__(self, other):
197 """Sort payloads by name, iov, revision"""
198 return (self.name.lower(), self.iov, self.revision) < (other.name.lower(), other.iov, other.revision)
199
200 def readable_iov(self):
201 """return a human readable name for the IoV"""
202 if self.iov is None:
203 return "none"
204
205 if self.iov == (0, 0, -1, -1):
206 return "always"
207
208 e1, r1, e2, r2 = self.iov
209 if e1 == e2:
210 if r1 == 0 and r2 == -1:
211 return f"exp {e1}"
212 elif r2 == -1:
213 return f"exp {e1}, runs {r1}+"
214 elif r1 == r2:
215 return f"exp {e1}, run {r1}"
216 else:
217 return f"exp {e1}, runs {r1} - {r2}"
218 else:
219 if e2 == -1 and r1 == 0:
220 return f"exp {e1} - forever"
221 elif e2 == -1:
222 return f"exp {e1}, run {r1} - forever"
223 elif r1 == 0 and r2 == -1:
224 return f"exp {e1}-{e2}, all runs"
225 elif r2 == -1:
226 return f"exp {e1}, run {r1} - exp {e2}, all runs"
227 else:
228 return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
229
230
232 """Class to interface conditions db REST interface"""
233
234
235 BASE_URLS = [conditions.default_metadata_provider_url]
236
237 class RequestError(RuntimeError):
238 """Class to be thrown by request() if there is any error"""
239 pass
240
241 @staticmethod
242 def get_base_urls(given_url):
243 """Resolve the list of server urls. If a url is given just return it.
244 Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
245 builtin defaults
246
247 Arguments:
248 given_url (str): Explicit base_url. If this is not None it will be
249 returned as is in a list of length 1
250
251 Returns:
252 a list of urls to try for database connectivity
253 """
254
255 base_url_list = ConditionsDB.BASE_URLS[:]
256 base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
257 if given_url is not None:
258 base_url_list = [given_url]
259 elif base_url_env is not None:
260 base_url_list = base_url_env.split()
261 B2INFO("Getting Conditions Database servers from Environment:")
262 for i, url in enumerate(base_url_list, 1):
263 B2INFO(f" {i}. {url}")
264 # try to escalate to https for all given urls
265 full_list = []
266 for url in base_url_list:
267 if url.startswith("http://"):
268 full_list.append("https" + url[4:])
269 # but keep the http in case of connection problems
270 full_list.append(url)
271 return full_list
272
273 def __init__(self, base_url=None, max_connections=10, retries=3):
274 """
275 Create a new instance of the interface
276
277 Args:
278 base_url (string): base url of the rest interface
279 max_connections (int): number of connections to keep open, mostly useful for threaded applications
280 retries (int): number of retries in case of connection problems
281 """
282
283
284 self._session = requests.Session()
285 # and set the connection options we want to have
286 adapter = requests.adapters.HTTPAdapter(
287 pool_connections=max_connections, pool_maxsize=max_connections,
288 max_retries=retries, pool_block=True
289 )
290 self._session.mount("http://", adapter)
291 self._session.mount("https://", adapter)
292 # and also set the proxy settings
293 if "BELLE2_CONDB_PROXY" in os.environ:
294 self._session.proxies = {
295 "http": os.environ.get("BELLE2_CONDB_PROXY"),
296 "https": os.environ.get("BELLE2_CONDB_PROXY"),
297 }
298 # test the given url or try the known defaults
299 base_url_list = ConditionsDB.get_base_urls(base_url)
300
301 for url in base_url_list:
302
303 self._base_url = url.rstrip("/") + "/"
304 try:
305 req = self._session.request("HEAD", self._base_url + "v2/globalTags")
306 req.raise_for_status()
307 except requests.RequestException as e:
308 B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
309 else:
310 break
311 else:
312 B2FATAL("No working database servers configured, giving up")
313
314 # We have a working server so change the api to return json instead of
315 # xml, much easier in python, also request non-cached replies. We do
316 # this now because for the server check above we're fine with cached
317 # results.
318 self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
319
320 def set_authentication_token(self, token):
321 """
322 Set authentication token when talking to the database
323
324 Args:
325 token (str): JWT to hand to the database. Will not be checked
326 for validity here.
327 """
328 self._session.auth = BearerAuth(token)
329
330 def request(self, method, url, message=None, *args, **argk):
331 """
332 Request function, similar to requests.request but adding the base_url
333
334 Args:
335 method (str): GET, POST, etc.
336 url (str): url for the request, base_url will be prepended
337 message (str): message to show when starting the request and if it fails
338
339 All other arguments will be forwarded to requests.request.
340 """
341 if message is not None:
342 B2INFO(message)
343
344 try:
345 req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
346 except requests.exceptions.ConnectionError as e:
347 B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
348
349 if req.status_code >= 300:
350 # Apparently something is not good. Let's try to decode the json
351 # reply containing reason and message
352 try:
353 response = req.json()
354 message = response.get("message", "")
355 colon = ": " if message.strip() else ""
356 error = f"Request {method} {url} returned {response['code']} {response['reason']}{colon}{message}"
357 except json.JSONDecodeError:
358 # seems the reply was not even json
359 error = f"Request {method} {url} returned non JSON response {req.status_code}: {req.content}"
360
361 if message is not None:
362 raise ConditionsDB.RequestError(f"{message} failed: {error}")
363 else:
364 raise ConditionsDB.RequestError(error)
365
366 if method != "HEAD" and req.status_code != requests.codes.no_content:
367 try:
368 req.json()
369 except json.JSONDecodeError as e:
370 B2INFO(f"Invalid response: {req.content}")
371 raise ConditionsDB.RequestError(f"{method} {url} returned invalid JSON response {e}")
372 return req
373
374 def get_globalTags(self):
375 """Get a list of all globaltags. Returns a dictionary with the globaltag
376 names and the corresponding ids in the database"""
377
378 try:
379 req = self.request("GET", "/globalTags")
380 except ConditionsDB.RequestError as e:
381 B2ERROR(f"Could not get the list of globaltags: {e}")
382 return None
383
384 result = {}
385 for tag in req.json():
386 result[tag["name"]] = tag
387
388 return result
389
390 def has_globalTag(self, name):
391 """Check whether the globaltag with the given name exists."""
392
393 try:
394 self.request("GET", f"/globalTag/{encode_name(name)}")
396 return False
397
398 return True
399
400 def get_globalTagInfo(self, name):
401 """Get the id of the globaltag with the given name. Returns either the
402 id or None if the tag was not found"""
403
404 try:
405 req = self.request("GET", f"/globalTag/{encode_name(name)}")
406 except ConditionsDB.RequestError as e:
407 B2ERROR(f"Cannot find globaltag '{name}': {e}")
408 return None
409
410 return req.json()
411
412 def get_globalTagType(self, name):
413 """
414 Get the dictionary describing the given globaltag type (currently
415 one of DEV or RELEASE). Returns None if tag type was not found.
416 """
417 try:
418 req = self.request("GET", "/globalTagType")
419 except ConditionsDB.RequestError as e:
420 B2ERROR(f"Could not get list of valid globaltag types: {e}")
421 return None
422
423 types = {e["name"]: e for e in req.json()}
424
425 if name in types:
426 return types[name]
427
428 B2ERROR(f"Unknown globaltag type: '{name}', please use one of {', '.join(types)}")
429 return None
430
431 def create_globalTag(self, name, description, user):
432 """
433 Create a new globaltag
434 """
435 info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
436 try:
437 req = self.request("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
438 except ConditionsDB.RequestError as e:
439 B2ERROR(f"Could not create globaltag {name}: {e}")
440 return None
441
442 return req.json()
443
444 def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
445 """
446 Return list of all payloads in the given globaltag where each element is
447 a `PayloadInformation` instance
448
449 Parameters:
450 gobalTag (str): name of the globaltag
451 exp (int): if given limit the list of payloads to the ones valid for
452 the given exp,run combination
453 run (int): if given limit the list of payloads to the ones valid for
454 the given exp,run combination
455 message (str): additional message to show when downloading the
456 payload information. Will be directly appended to
457 "Obtaining lists of iovs for globaltag {globalTag}"
458 run_range (tuple): if given limit the list of payloads to the ones
459 overlapping with the given run range, if
460 fully_contained (bool): if True and the run_range is not None it limits
461 the list of payloads to the ones fully contained in the given run range
462
463 Warning:
464 Both, exp and run, need to be given at the same time. Just supplying
465 an experiment or a run number will not work
466 """
467 globalTag = encode_name(globalTag)
468 if message is None:
469 message = ""
470 if run_range is not None:
471 if fully_contained:
472 message += f" [fully contained in {tuple(run_range)}]"
473 else:
474 message += f" [valid in {tuple(run_range)}]"
475 run_range = IntervalOfValidity(run_range)
476
477 if exp is not None:
478 msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
479 req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
480 else:
481 msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
482 req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
483 all_iovs = []
484 for item in req.json():
485 payload = item["payload" if 'payload' in item else "payloadId"]
486 if "payloadIov" in item:
487 iovs = [item['payloadIov']]
488 else:
489 iovs = item['payloadIovs']
490
491 for iov in iovs:
492 if run_range is not None:
493 iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
494 if fully_contained:
495 if not iov_ & run_range == iov_:
496 continue
497 else:
498 if iov_ & run_range is None:
499 continue
500 all_iovs.append(PayloadInformation.from_json(payload, iov))
501
502 all_iovs.sort()
503 return all_iovs
504
505 def get_payloads(self, global_tag=None):
506 """
507 Get a list of all defined payloads (for the given global_tag or by default for all).
508 Returns a dictionary which maps (module, checksum) to the payload id.
509 """
510
511 try:
512 if global_tag:
513 req = self.request("GET", f"/globalTag/{encode_name(global_tag)}/payloads")
514 else:
515 req = self.request("GET", "/payloads")
516 except ConditionsDB.RequestError as e:
517 B2ERROR(f"Cannot get list of payloads: {e}")
518 return {}
519
520 result = {}
521 for payload in req.json():
522 module = payload["basf2Module"]["name"]
523 checksum = payload["checksum"]
524 result[(module, checksum)] = payload["payloadId"]
525
526 return result
527
528 def check_payloads(self, payloads, information="payloadId"):
529 """
530 Check for the existence of payloads in the database.
531
532 Arguments:
533 payloads (list((str,str))): A list of payloads to check for. Each
534 payload needs to be a tuple of the name of the payload and the
535 md5 checksum of the payload file.
536 information (str): The information to be extracted from the
537 payload dictionary
538
539 Returns:
540 A dictionary with the payload identifiers (name, checksum) as keys
541 and the requested information as values for all payloads which are already
542 present in the database.
543 """
544
545 search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
546 try:
547 req = self.request("POST", "/checkPayloads", json=search_query)
548 except ConditionsDB.RequestError as e:
549 B2ERROR(f"Cannot check for existing payloads: {e}")
550 return {}
551
552 result = {}
553 for payload in req.json():
554 module = payload["basf2Module"]["name"]
555 checksum = payload["checksum"]
556 result[(module, checksum)] = payload[information]
557
558 return result
559
560 def get_revisions(self, entries):
561 """
562 Get the revision numbers of payloads in the database.
563
564 Arguments:
565 entries (list): A list of payload entries.
566 Each entry must have the attributes module and checksum.
567
568 Returns:
569 True if successful.
570 """
571
572 result = self.check_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
573 if not result:
574 return False
575
576 for entry in entries:
577 entry.revision = result.get((entry.module, entry.checksum), 0)
578
579 return True
580
581 def create_payload(self, module, filename, checksum=None):
582 """
583 Create a new payload
584
585 Args:
586 module (str): name of the module
587 filename (str): name of the file
588 checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
589 """
590 if checksum is None:
591 checksum = file_checksum(filename)
592
593 # this is the only complicated request as we have to provide a
594 # multipart/mixed request which is not directly provided by the request
595 # library.
596 files = [
597 (filename, open(filename, "rb").read(), "application/x-root"),
598 ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
599 ]
600 # ok we have the two "files" we want to send, create multipart/mixed
601 # body
602 fields = []
603 for name, contents, mimetype in files:
604 rf = RequestField(name=name, data=contents)
605 rf.make_multipart(content_type=mimetype)
606 fields.append(rf)
607
608 post_body, content_type = encode_multipart_formdata(fields)
609 content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
610 headers = {'Content-Type': content_type}
611
612 # now make the request. Note to self: if multipart/form-data would be
613 # accepted this would be so much nicer here. but it works.
614 try:
615 req = self.request("POST", f"/package/dbstore/module/{encode_name(module)}/payload", data=post_body, headers=headers)
616 except ConditionsDB.RequestError as e:
617 B2ERROR(f"Could not create Payload: {e}")
618 return None
619
620 return req.json()["payloadId"]
621
622 def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
623 """
624 Create an iov.
625
626 Args:
627 globalTagId (int): id of the globaltag, obtain with get_globalTagId()
628 payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
629 firstExp (int): first experiment for which this iov is valid
630 firstRun (int): first run for which this iov is valid
631 finalExp (int): final experiment for which this iov is valid
632 finalRun (int): final run for which this iov is valid
633
634 Returns:
635 payloadIovId of the created iov, None if creation was not successful
636 """
637 try:
638 # try to convert all arguments except self to integers to make sure they are
639 # valid.
640 local_variables = locals()
641 variables = {e: int(local_variables[e]) for e in
642 ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
643 except ValueError:
644 B2ERROR("create_iov: All parameters need to be integers")
645 return None
646
647 # try to create the iov
648 try:
649 req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
650 "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
651 except ConditionsDB.RequestError as e:
652 B2ERROR(f"Could not create IOV: {e}")
653 return None
654
655 return req.json()["payloadIovId"]
656
657 def delete_iov(self, iovId):
658 """Delete an iov
659
660 Args:
661 iovId (int): id of the iov to be deleted
662 """
663 try:
664 self.request("DELETE", f"/payloadIov/{iovId}")
665 except ConditionsDB.RequestError as e:
666 B2ERROR(f"Could not delete IOV: {e}")
667
668 def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun):
669 """Modify the validity range of a given iov
670
671 Args:
672 iovId (int): id of the iov to be modified
673 firstExp (int): first experiment for which this iov is valid
674 firstRun (int): first run for which this iov is valid
675 finalExp (int): final experiment for which this iov is valid
676 finalRun (int): final run for which this iov is valid
677 """
678 try:
679 querystring = {"expStart": str(firstExp), "runStart": str(firstRun), "expEnd": str(finalExp), "runEnd": str(finalRun)}
680 self.request("PUT", f"/payloadIov/{iovId}", params=querystring)
681 except ConditionsDB.RequestError as e:
682 B2ERROR(f"Could not modify IOV: {e}")
683
684 def get_iovs(self, globalTagName, payloadName=None):
685 """Return existing iovs for a given tag name. It returns a dictionary
686 which maps (payloadId, first runId, final runId) to iovId
687
688 Parameters:
689 globalTagName(str): Global tag name.
690 payloadName(str): Payload name (if None, selection by name is
691 not performed.
692 """
693
694 try:
695 req = self.request("GET", f"/globalTag/{encode_name(globalTagName)}/globalTagPayloads")
697 # there could be just no iovs so no error
698 return {}
699
700 result = {}
701 for payload in req.json():
702 payloadId = payload["payloadId"]["payloadId"]
703 if payloadName is not None:
704 if payload["payloadId"]["basf2Module"]["name"] != payloadName:
705 continue
706 for iov in payload["payloadIovs"]:
707 iovId = iov["payloadIovId"]
708 firstExp, firstRun = iov["expStart"], iov["runStart"]
709 finalExp, finalRun = iov["expEnd"], iov["runEnd"]
710 result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
711
712 return result
713
714 def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
715 """
716 Upload a testing payload storage to the conditions database.
717
718 Parameters:
719 filename (str): filename of the testing payload storage file that should be uploaded
720 global_tage (str): name of the globaltag to which the data should be uploaded
721 normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
722 same content, if normalize is a string in addition the file name in the root file metadata will be set to it
723 ignore_existing (bool): if True do not upload payloads that already exist
724 nprocess (int): maximal number of parallel uploads
725 uploaded_entries (list): the list of successfully uploaded entries
726
727 Returns:
728 True if the upload was successful
729 """
730
731 # first create a list of payloads
732 from conditions_db.testing_payloads import parse_testing_payloads_file
733 B2INFO(f"Reading payload list from {filename}")
734 entries = parse_testing_payloads_file(filename)
735 if entries is None:
736 B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
737 return False
738
739 if not entries:
740 B2INFO(f"No payloads found in {filename}, exiting")
741 return True
742
743 B2INFO(f"Found {len(entries)} iovs to upload")
744
745 # time to get the id for the globaltag
746 tagId = self.get_globalTagInfo(global_tag)
747 if tagId is None:
748 return False
749 tagId = tagId["globalTagId"]
750
751 # now we could have more than one payload with the same iov so let's go over
752 # it again and remove duplicates but keep the last one for each
753 entries = sorted(set(reversed(entries)))
754
755 if normalize:
756 name = normalize if normalize is not True else None
757 for e in entries:
758 e.normalize(name=name)
759
760 # so let's have a list of all payloads (name, checksum) as some payloads
761 # might have multiple iovs. Each payload gets a list of all of those
762 payloads = defaultdict(list)
763 for e in entries:
764 payloads[(e.module, e.checksum)].append(e)
765
766 existing_payloads = {}
767 existing_iovs = {}
768
769 def upload_payload(item):
770 """Upload a payload file if necessary but first check list of existing payloads"""
771 key, entries = item
772 if key in existing_payloads:
773 B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
774 payload_id = existing_payloads[key]
775 else:
776 entry = entries[0]
777 payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
778 if payload_id is None:
779 return False
780
781 B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
782
783 for entry in entries:
784 entry.payload = payload_id
785
786 return True
787
788 def create_iov(entry):
789 """Create an iov if necessary but first check the list of existing iovs"""
790 if entry.payload is None:
791 return None
792
793 iov_key = (entry.payload,) + entry.iov_tuple
794 if iov_key in existing_iovs:
795 entry.iov = existing_iovs[iov_key]
796 B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
797 else:
798 entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
799 if entry.payloadIovId is None:
800 return False
801
802 B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
803
804 return entry
805
806 # multithreading for the win ...
807 with ThreadPoolExecutor(max_workers=nprocess) as pool:
808 # if we want to check for existing payloads/iovs we schedule the download of
809 # the full payload list. And write a message as each completes
810 if not ignore_existing:
811 B2INFO("Downloading information about existing payloads and iovs...")
812 futures = []
813 existing_iovs = {}
814 existing_payloads = {}
815
816 def create_future(iter, func, callback=None):
817 fn = pool.submit(iter, func)
818 if callback is not None:
819 fn.add_done_callback(callback)
820 futures.append(fn)
821
822 def update_iovs(iovs):
823 existing_iovs.update(iovs.result())
824 B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
825
826 def update_payloads(payloads):
827 existing_payloads.update(payloads.result())
828 B2INFO(f"Found {len(existing_payloads)} existing payloads")
829
830 create_future(self.get_iovs, global_tag, update_iovs)
831 # checking existing payloads should not be done with too many at once
832 for chunk in chunks(payloads.keys(), 1000):
833 create_future(self.check_payloads, chunk, update_payloads)
834
835 futures_wait(futures)
836
837 # upload payloads
838 failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
839 if failed_payloads > 0:
840 B2ERROR(f"{failed_payloads} payloads could not be uploaded")
841
842 # create IoVs
843 failed_iovs = 0
844 for entry in pool.map(create_iov, entries):
845 if entry:
846 if uploaded_entries is not None:
847 uploaded_entries.append(entry)
848 else:
849 failed_iovs += 1
850 if failed_iovs > 0:
851 B2ERROR(f"{failed_iovs} IoVs could not be created")
852
853 # update revision numbers
854 if uploaded_entries is not None:
855 self.get_revisions(uploaded_entries)
856
857 return failed_payloads + failed_iovs == 0
858
859 def staging_request(self, filename, normalize, data, password):
860 """
861 Upload a testing payload storage to a staging globaltag and create or update a jira issue
862
863 Parameters:
864 filename (str): filename of the testing payload storage file that should be uploaded
865 normalize (Union[bool, str]): if True the payload root files will be
866 normalized to have the same checksum for the same content, if
867 normalize is a string in addition the file name in the root file
868 metadata will be set to it
869 data (dict): a dictionary with the information provided by the user:
870
871 * task: category of globaltag, either main, online, prompt, data, mc, or analysis
872 * tag: the globaltag name
873 * request: type of request, either Update, New, or Modification. The latter two imply task == main because
874 if new payload classes are introduced or payload classes are modified then they will first be included in
875 the main globaltag. Here a synchronization of code and payload changes has to be managed.
876 If new or modified payload classes should be included in other globaltags they must already be in a release.
877 * pull-request: number of the pull request containing new or modified payload classes,
878 only for request == New or Modified
879 * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
880 only for request == Modified
881 * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
882 only for request == Modified
883 * release: the required release version
884 * reason: the reason for the request
885 * description: a detailed description for the globaltag manager
886 * issue: identifier of an existing jira issue (optional)
887 * user: name of the user
888 * time: time stamp of the request
889
890 password: the password for access to jira or the access token and secret for oauth access
891
892 Returns:
893 True if the upload and jira issue creation/upload was successful
894 """
895
896 # determine the staging globaltag name
897 data['tag'] = upload_global_tag(data['task'])
898 if data['tag'] is None:
899 data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
900
901 # create the staging globaltag if it does not exists yet
902 if not self.has_globalTag(data['tag']):
903 if not self.create_globalTag(data['tag'], data['reason'], data['user']):
904 return False
905
906 # upload the payloads
907 B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
908 entries = []
909 if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
910 return False
911
912 # get the dictionary for the jira issue creation/update
913 if data['issue']:
914 issue = data['issue']
915 else:
916 issue = jira_global_tag_v2(data['task'])
917 if issue is None:
918 issue = {"components": [{"name": "globaltag"}]}
919
920 # create jira issue text from provided information
921 if type(issue) is tuple:
922 description = issue[1].format(**data)
923 issue = issue[0]
924 else:
925 description = f"""
926|*Upload globaltag* | {data['tag']} |
927|*Request reason* | {data['reason']} |
928|*Required release* | {data['release']} |
929|*Type of request* | {data['request']} |
930"""
931 if 'pull-request' in data.keys():
932 description += f"|*Pull request* | \\#{data['pull-request']} |\n"
933 if 'backward-compatibility' in data.keys():
934 description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
935 if 'forward-compatibility' in data.keys():
936 description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
937 description += '|*Details* |' + ''.join(data['details']) + ' |\n'
938 if data['task'] == 'online':
939 description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
940
941 # add information about uploaded payloads/IoVs
942 description += '\nPayloads\n||Name||Revision||IoV||\n'
943 for entry in entries:
944 description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
945
946 # create a new issue
947 if type(issue) is dict:
948 issue["description"] = description
949 if "summary" in issue.keys():
950 issue["summary"] = issue["summary"].format(**data)
951 else:
952 issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
953 if "project" not in issue.keys():
954 issue["project"] = {"key": "BII"}
955 if "issuetype" not in issue.keys():
956 issue["issuetype"] = {"name": "Task"}
957 if data["task"] == "main":
958 issue["labels"] = ["TUPPR"]
959
960 B2INFO(f"Creating jira issue for {data['task']} globaltag request")
961 if isinstance(password, str):
962 response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
963 json={'fields': issue})
964 else:
965 fields = {'issue': json.dumps(issue)}
966 if 'user' in data.keys():
967 fields['user'] = data['user']
968 if password:
969 fields['token'] = password[0]
970 fields['secret'] = password[1]
971 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
972 if response.status_code in range(200, 210):
973 B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
974 else:
975 B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
976 return False
977
978 # comment on an existing issue
979 else:
980 # Let's make sure all assignees of new issues are added as watchers
981 # in that case, otherwise they might never find out
982 new_issue_config = jira_global_tag_v2(data['task'])
983 if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
984 user = new_issue_config['assignee'].get('name', None)
985 if user is not None and isinstance(password, str):
986 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
987 auth=(data['user'], password), json=user)
988 if response.status_code in range(200, 210):
989 B2INFO(f"Added {user} as watcher to {issue}")
990 else:
991 B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
992
993 B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
994 if isinstance(password, str):
995 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/comment',
996 auth=(data['user'], password), json={'body': description})
997 else:
998 fields = {'id': issue, 'user': user, 'comment': description}
999 if password:
1000 fields['token'] = password[0]
1001 fields['secret'] = password[1]
1002 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1003 if response.status_code in range(200, 210):
1004 B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1005 else:
1006 B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
1007 return False
1008
1009 return True
1010
1011
1012def require_database_for_test(timeout=60, base_url=None):
1013 """Make sure that the database is available and skip the test if not.
1014
1015 This function should be called in test scripts if they are expected to fail
1016 if the database is down. It either returns when the database is ok or it
1017 will signal test_basf2 that the test should be skipped and exit
1018 """
1019 import sys
1020 if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
1021 raise Exception("Access to the Database is disabled")
1022 base_url_list = ConditionsDB.get_base_urls(base_url)
1023 for url in base_url_list:
1024 try:
1025 req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
1026 req.raise_for_status()
1027 except requests.RequestException as e:
1028 B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
1029 else:
1030 break
1031 else:
1032 print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
1033 sys.exit(1)
1034
1035
1036def enable_debugging():
1037 """Enable verbose output of python-requests to be able to debug http connections"""
1038 # These two lines enable debugging at httplib level (requests->urllib3->http.client)
1039 # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
1040 # The only thing missing will be the response.body which is not logged.
1041 import http.client as http_client
1042 import logging
1043 http_client.HTTPConnection.debuglevel = 1
1044 # You must initialize logging, otherwise you'll not see debug output.
1045 logging.basicConfig()
1046 logging.getLogger().setLevel(logging.DEBUG)
1047 requests_log = logging.getLogger("requests.packages.urllib3")
1048 requests_log.setLevel(logging.DEBUG)
1049 requests_log.propagate = True
def __call__(self, r)
Definition: __init__.py:130
_authtoken
Authorization header to send with each request.
Definition: __init__.py:128
def __init__(self, token)
Definition: __init__.py:125
def get_payloads(self, global_tag=None)
Definition: __init__.py:505
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:684
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:714
def set_authentication_token(self, token)
Definition: __init__.py:320
def delete_iov(self, iovId)
Definition: __init__.py:657
def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False)
Definition: __init__.py:444
def get_globalTagType(self, name)
Definition: __init__.py:412
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:528
def has_globalTag(self, name)
Definition: __init__.py:390
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:622
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:330
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:273
def get_revisions(self, entries)
Definition: __init__.py:560
def create_globalTag(self, name, description, user)
Definition: __init__.py:431
def get_globalTagInfo(self, name)
Definition: __init__.py:400
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:284
def get_base_urls(given_url)
Definition: __init__.py:242
def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:668
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:859
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:581
_base_url
base url to be prepended to all requests
Definition: __init__.py:303
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:175
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:177
name
name of the payload
Definition: __init__.py:167
checksum
checksum of the payload
Definition: __init__.py:169
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:162
def from_json(cls, payload, iov=None)
Definition: __init__.py:141
revision
revision, not used for comparisons
Definition: __init__.py:173
iov
interval of validity
Definition: __init__.py:171