Belle II Software development
ConditionsDB Class Reference

Classes

class  RequestError
 

Public Member Functions

 __init__ (self, base_url=None, max_connections=10, retries=3)
 
 set_authentication_token (self, token)
 
 request (self, method, url, message=None, *args, **argk)
 
 get_globalTags (self)
 
 has_globalTag (self, name)
 
 get_globalTagInfo (self, name)
 
 get_globalTagType (self, name)
 
 create_globalTag (self, name, description, user)
 
 get_all_iovs (self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False)
 
 get_payloads (self, global_tag=None)
 
 check_payloads (self, payloads, information="payloadId")
 
 get_revisions (self, entries)
 
 create_payload (self, module, filename, checksum=None)
 
 create_iov (self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
 
 delete_iov (self, iovId)
 
 modify_iov (self, iovId, firstExp, firstRun, finalExp, finalRun)
 
 get_iovs (self, globalTagName, payloadName=None)
 
 upload (self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
 
 staging_request (self, filename, normalize, data, password)
 

Static Public Member Functions

 get_base_urls (given_url)
 

Static Public Attributes

list BASE_URLS = [conditions.default_metadata_provider_server]
 base url to the conditions db to be used if no custom url is given
 

Protected Attributes

 _session = requests.Session()
 session object to get keep-alive support and connection pooling
 
str _base_url = url.rstrip("/") + "/"
 base url to be prepended to all requests
 

Detailed Description

Class to interface conditions db REST interface

Definition at line 231 of file __init__.py.

Constructor & Destructor Documentation

◆ __init__()

__init__ ( self,
base_url = None,
max_connections = 10,
retries = 3 )
Create a new instance of the interface

Args:
    base_url (string): base url of the rest interface
    max_connections (int): number of connections to keep open, mostly useful for threaded applications
    retries (int): number of retries in case of connection problems

Definition at line 273 of file __init__.py.

273 def __init__(self, base_url=None, max_connections=10, retries=3):
274 """
275 Create a new instance of the interface
276
277 Args:
278 base_url (string): base url of the rest interface
279 max_connections (int): number of connections to keep open, mostly useful for threaded applications
280 retries (int): number of retries in case of connection problems
281 """
282
283
284 self._session = requests.Session()
285 # and set the connection options we want to have
286 adapter = requests.adapters.HTTPAdapter(
287 pool_connections=max_connections, pool_maxsize=max_connections,
288 max_retries=retries, pool_block=True
289 )
290 self._session.mount("http://", adapter)
291 self._session.mount("https://", adapter)
292 # and also set the proxy settings
293 if "BELLE2_CONDB_PROXY" in os.environ:
294 self._session.proxies = {
295 "http": os.environ.get("BELLE2_CONDB_PROXY"),
296 "https": os.environ.get("BELLE2_CONDB_PROXY"),
297 }
298 # test the given url or try the known defaults
299 base_url_list = ConditionsDB.get_base_urls(base_url)
300
301 for url in base_url_list:
302
303 self._base_url = url.rstrip("/") + "/"
304 try:
305 req = self._session.request("HEAD", self._base_url + "v2/globalTagStatus")
306 req.raise_for_status()
307 except requests.RequestException as e:
308 B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
309 else:
310 break
311 else:
312 B2FATAL("No working database servers configured, giving up")
313
314 # We have a working server so change the api to return json instead of
315 # xml, much easier in python, also request non-cached replies. We do
316 # this now because for the server check above we're fine with cached
317 # results.
318 self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
319

Member Function Documentation

◆ check_payloads()

check_payloads ( self,
payloads,
information = "payloadId" )
Check for the existence of payloads in the database.

Arguments:
    payloads (list((str,str))): A list of payloads to check for. Each
       payload needs to be a tuple of the name of the payload and the
       md5 checksum of the payload file.
    information (str): The information to be extracted from the
       payload dictionary

Returns:
    A dictionary with the payload identifiers (name, checksum) as keys
    and the requested information as values for all payloads which are already
    present in the database.

Definition at line 528 of file __init__.py.

528 def check_payloads(self, payloads, information="payloadId"):
529 """
530 Check for the existence of payloads in the database.
531
532 Arguments:
533 payloads (list((str,str))): A list of payloads to check for. Each
534 payload needs to be a tuple of the name of the payload and the
535 md5 checksum of the payload file.
536 information (str): The information to be extracted from the
537 payload dictionary
538
539 Returns:
540 A dictionary with the payload identifiers (name, checksum) as keys
541 and the requested information as values for all payloads which are already
542 present in the database.
543 """
544
545 search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
546 try:
547 req = self.request("POST", "/checkPayloads", json=search_query)
548 except ConditionsDB.RequestError as e:
549 B2ERROR(f"Cannot check for existing payloads: {e}")
550 return {}
551
552 result = {}
553 for payload in req.json():
554 module = payload["basf2Module"]["name"]
555 checksum = payload["checksum"]
556 result[(module, checksum)] = payload[information]
557
558 return result
559

◆ create_globalTag()

create_globalTag ( self,
name,
description,
user )
Create a new globaltag

Definition at line 431 of file __init__.py.

431 def create_globalTag(self, name, description, user):
432 """
433 Create a new globaltag
434 """
435 info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
436 try:
437 req = self.request("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
438 except ConditionsDB.RequestError as e:
439 B2ERROR(f"Could not create globaltag {name}: {e}")
440 return None
441
442 return req.json()
443

◆ create_iov()

create_iov ( self,
globalTagId,
payloadId,
firstExp,
firstRun,
finalExp,
finalRun )
Create an iov.

Args:
    globalTagId (int): id of the globaltag, obtain with get_globalTagId()
    payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
    firstExp (int): first experiment for which this iov is valid
    firstRun (int): first run for which this iov is valid
    finalExp (int): final experiment for which this iov is valid
    finalRun (int): final run for which this iov is valid

Returns:
    payloadIovId of the created iov, None if creation was not successful

Definition at line 622 of file __init__.py.

622 def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
623 """
624 Create an iov.
625
626 Args:
627 globalTagId (int): id of the globaltag, obtain with get_globalTagId()
628 payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
629 firstExp (int): first experiment for which this iov is valid
630 firstRun (int): first run for which this iov is valid
631 finalExp (int): final experiment for which this iov is valid
632 finalRun (int): final run for which this iov is valid
633
634 Returns:
635 payloadIovId of the created iov, None if creation was not successful
636 """
637 try:
638 # try to convert all arguments except self to integers to make sure they are
639 # valid.
640 local_variables = locals()
641 variables = {e: int(local_variables[e]) for e in
642 ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
643 except ValueError:
644 B2ERROR("create_iov: All parameters need to be integers")
645 return None
646
647 # try to create the iov
648 try:
649 req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
650 "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
651 except ConditionsDB.RequestError as e:
652 B2ERROR(f"Could not create IOV: {e}")
653 return None
654
655 return req.json()["payloadIovId"]
656

◆ create_payload()

create_payload ( self,
module,
filename,
checksum = None )
Create a new payload

Args:
    module (str): name of the module
    filename (str): name of the file
    checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given

Definition at line 581 of file __init__.py.

581 def create_payload(self, module, filename, checksum=None):
582 """
583 Create a new payload
584
585 Args:
586 module (str): name of the module
587 filename (str): name of the file
588 checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
589 """
590 if checksum is None:
591 checksum = file_checksum(filename)
592
593 # this is the only complicated request as we have to provide a
594 # multipart/mixed request which is not directly provided by the request
595 # library.
596 files = [
597 (filename, open(filename, "rb").read(), "application/x-root"),
598 ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
599 ]
600 # ok we have the two "files" we want to send, create multipart/mixed
601 # body
602 fields = []
603 for name, contents, mimetype in files:
604 rf = RequestField(name=name, data=contents)
605 rf.make_multipart(content_type=mimetype)
606 fields.append(rf)
607
608 post_body, content_type = encode_multipart_formdata(fields)
609 content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
610 headers = {'Content-Type': content_type}
611
612 # now make the request. Note to self: if multipart/form-data would be
613 # accepted this would be so much nicer here. but it works.
614 try:
615 req = self.request("POST", f"/package/dbstore/module/{encode_name(module)}/payload", data=post_body, headers=headers)
616 except ConditionsDB.RequestError as e:
617 B2ERROR(f"Could not create Payload: {e}")
618 return None
619
620 return req.json()["payloadId"]
621

◆ delete_iov()

delete_iov ( self,
iovId )
Delete an iov

Args:
    iovId (int): id of the iov to be deleted

Definition at line 657 of file __init__.py.

657 def delete_iov(self, iovId):
658 """Delete an iov
659
660 Args:
661 iovId (int): id of the iov to be deleted
662 """
663 try:
664 self.request("DELETE", f"/payloadIov/{iovId}")
665 except ConditionsDB.RequestError as e:
666 B2ERROR(f"Could not delete IOV: {e}")
667

◆ get_all_iovs()

get_all_iovs ( self,
globalTag,
exp = None,
run = None,
message = None,
run_range = None,
fully_contained = False )
Return list of all payloads in the given globaltag where each element is
a `PayloadInformation` instance

Parameters:
    gobalTag (str): name of the globaltag
    exp (int): if given limit the list of payloads to the ones valid for
        the given exp,run combination
    run (int): if given limit the list of payloads to the ones valid for
        the given exp,run combination
    message (str): additional message to show when downloading the
        payload information. Will be directly appended to
        "Obtaining lists of iovs for globaltag {globalTag}"
    run_range (tuple): if given limit the list of payloads to the ones
        overlapping with the given run range, if
    fully_contained (bool): if True and the run_range is not None it limits
        the list of payloads to the ones fully contained in the given run range

Warning:
    Both, exp and run, need to be given at the same time. Just supplying
    an experiment or a run number will not work

Definition at line 444 of file __init__.py.

444 def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
445 """
446 Return list of all payloads in the given globaltag where each element is
447 a `PayloadInformation` instance
448
449 Parameters:
450 gobalTag (str): name of the globaltag
451 exp (int): if given limit the list of payloads to the ones valid for
452 the given exp,run combination
453 run (int): if given limit the list of payloads to the ones valid for
454 the given exp,run combination
455 message (str): additional message to show when downloading the
456 payload information. Will be directly appended to
457 "Obtaining lists of iovs for globaltag {globalTag}"
458 run_range (tuple): if given limit the list of payloads to the ones
459 overlapping with the given run range, if
460 fully_contained (bool): if True and the run_range is not None it limits
461 the list of payloads to the ones fully contained in the given run range
462
463 Warning:
464 Both, exp and run, need to be given at the same time. Just supplying
465 an experiment or a run number will not work
466 """
467 globalTag = encode_name(globalTag)
468 if message is None:
469 message = ""
470 if run_range is not None:
471 if fully_contained:
472 message += f" [fully contained in {tuple(run_range)}]"
473 else:
474 message += f" [valid in {tuple(run_range)}]"
475 run_range = IntervalOfValidity(run_range)
476
477 if exp is not None:
478 msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
479 req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
480 else:
481 msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
482 req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
483 all_iovs = []
484 for item in req.json():
485 payload = item["payload" if 'payload' in item else "payloadId"]
486 if "payloadIov" in item:
487 iovs = [item['payloadIov']]
488 else:
489 iovs = item['payloadIovs']
490
491 for iov in iovs:
492 if run_range is not None:
493 iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
494 if fully_contained:
495 if not iov_ & run_range == iov_:
496 continue
497 else:
498 if iov_ & run_range is None:
499 continue
500 all_iovs.append(PayloadInformation.from_json(payload, iov))
501
502 all_iovs.sort()
503 return all_iovs
504

◆ get_base_urls()

get_base_urls ( given_url)
static
Resolve the list of server urls. If a url is given just return it.
Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
builtin defaults

Arguments:
    given_url (str): Explicit base_url. If this is not None it will be
       returned as is in a list of length 1

Returns:
    a list of urls to try for database connectivity

Definition at line 242 of file __init__.py.

242 def get_base_urls(given_url):
243 """Resolve the list of server urls. If a url is given just return it.
244 Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
245 builtin defaults
246
247 Arguments:
248 given_url (str): Explicit base_url. If this is not None it will be
249 returned as is in a list of length 1
250
251 Returns:
252 a list of urls to try for database connectivity
253 """
254
255 base_url_list = ConditionsDB.BASE_URLS[:]
256 base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
257 if given_url is not None:
258 base_url_list = [given_url]
259 elif base_url_env is not None:
260 base_url_list = base_url_env.split()
261 B2INFO("Getting Conditions Database servers from Environment:")
262 for i, url in enumerate(base_url_list, 1):
263 B2INFO(f" {i}. {url}")
264 # try to escalate to https for all given urls
265 full_list = []
266 for url in base_url_list:
267 if url.startswith("http://"):
268 full_list.append("https" + url[4:])
269 # but keep the http in case of connection problems
270 full_list.append(url)
271 return full_list
272

◆ get_globalTagInfo()

get_globalTagInfo ( self,
name )
Get the id of the globaltag with the given name. Returns either the
id or None if the tag was not found

Definition at line 400 of file __init__.py.

400 def get_globalTagInfo(self, name):
401 """Get the id of the globaltag with the given name. Returns either the
402 id or None if the tag was not found"""
403
404 try:
405 req = self.request("GET", f"/globalTag/{encode_name(name)}")
406 except ConditionsDB.RequestError as e:
407 B2ERROR(f"Cannot find globaltag '{name}': {e}")
408 return None
409
410 return req.json()
411

◆ get_globalTags()

get_globalTags ( self)
Get a list of all globaltags. Returns a dictionary with the globaltag
names and the corresponding ids in the database

Definition at line 374 of file __init__.py.

374 def get_globalTags(self):
375 """Get a list of all globaltags. Returns a dictionary with the globaltag
376 names and the corresponding ids in the database"""
377
378 try:
379 req = self.request("GET", "/globalTags")
380 except ConditionsDB.RequestError as e:
381 B2ERROR(f"Could not get the list of globaltags: {e}")
382 return None
383
384 result = {}
385 for tag in req.json():
386 result[tag["name"]] = tag
387
388 return result
389

◆ get_globalTagType()

get_globalTagType ( self,
name )
Get the dictionary describing the given globaltag type (currently
one of DEV or RELEASE). Returns None if tag type was not found.

Definition at line 412 of file __init__.py.

412 def get_globalTagType(self, name):
413 """
414 Get the dictionary describing the given globaltag type (currently
415 one of DEV or RELEASE). Returns None if tag type was not found.
416 """
417 try:
418 req = self.request("GET", "/globalTagType")
419 except ConditionsDB.RequestError as e:
420 B2ERROR(f"Could not get list of valid globaltag types: {e}")
421 return None
422
423 types = {e["name"]: e for e in req.json()}
424
425 if name in types:
426 return types[name]
427
428 B2ERROR(f"Unknown globaltag type: '{name}', please use one of {', '.join(types)}")
429 return None
430

◆ get_iovs()

get_iovs ( self,
globalTagName,
payloadName = None )
Return existing iovs for a given tag name. It returns a dictionary
which maps (payloadId, first runId, final runId) to iovId

Parameters:
  globalTagName(str): Global tag name.
  payloadName(str):   Payload name (if None, selection by name is
                      not performed.

Definition at line 684 of file __init__.py.

684 def get_iovs(self, globalTagName, payloadName=None):
685 """Return existing iovs for a given tag name. It returns a dictionary
686 which maps (payloadId, first runId, final runId) to iovId
687
688 Parameters:
689 globalTagName(str): Global tag name.
690 payloadName(str): Payload name (if None, selection by name is
691 not performed.
692 """
693
694 try:
695 req = self.request("GET", f"/globalTag/{encode_name(globalTagName)}/globalTagPayloads")
696 except ConditionsDB.RequestError:
697 # there could be just no iovs so no error
698 return {}
699
700 result = {}
701 for payload in req.json():
702 payloadId = payload["payloadId"]["payloadId"]
703 if payloadName is not None:
704 if payload["payloadId"]["basf2Module"]["name"] != payloadName:
705 continue
706 for iov in payload["payloadIovs"]:
707 iovId = iov["payloadIovId"]
708 firstExp, firstRun = iov["expStart"], iov["runStart"]
709 finalExp, finalRun = iov["expEnd"], iov["runEnd"]
710 result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
711
712 return result
713

◆ get_payloads()

get_payloads ( self,
global_tag = None )
Get a list of all defined payloads (for the given global_tag or by default for all).
Returns a dictionary which maps (module, checksum) to the payload id.

Definition at line 505 of file __init__.py.

505 def get_payloads(self, global_tag=None):
506 """
507 Get a list of all defined payloads (for the given global_tag or by default for all).
508 Returns a dictionary which maps (module, checksum) to the payload id.
509 """
510
511 try:
512 if global_tag:
513 req = self.request("GET", f"/globalTag/{encode_name(global_tag)}/payloads")
514 else:
515 req = self.request("GET", "/payloads")
516 except ConditionsDB.RequestError as e:
517 B2ERROR(f"Cannot get list of payloads: {e}")
518 return {}
519
520 result = {}
521 for payload in req.json():
522 module = payload["basf2Module"]["name"]
523 checksum = payload["checksum"]
524 result[(module, checksum)] = payload["payloadId"]
525
526 return result
527

◆ get_revisions()

get_revisions ( self,
entries )
Get the revision numbers of payloads in the database.

Arguments:
    entries (list): A list of payload entries.
       Each entry must have the attributes module and checksum.

Returns:
    True if successful.

Definition at line 560 of file __init__.py.

560 def get_revisions(self, entries):
561 """
562 Get the revision numbers of payloads in the database.
563
564 Arguments:
565 entries (list): A list of payload entries.
566 Each entry must have the attributes module and checksum.
567
568 Returns:
569 True if successful.
570 """
571
572 result = self.check_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
573 if not result:
574 return False
575
576 for entry in entries:
577 entry.revision = result.get((entry.module, entry.checksum), 0)
578
579 return True
580

◆ has_globalTag()

has_globalTag ( self,
name )
Check whether the globaltag with the given name exists.

Definition at line 390 of file __init__.py.

390 def has_globalTag(self, name):
391 """Check whether the globaltag with the given name exists."""
392
393 try:
394 self.request("GET", f"/globalTag/{encode_name(name)}")
395 except ConditionsDB.RequestError:
396 return False
397
398 return True
399

◆ modify_iov()

modify_iov ( self,
iovId,
firstExp,
firstRun,
finalExp,
finalRun )
Modify the validity range of a given iov

Args:
    iovId (int): id of the iov to be modified
    firstExp (int): first experiment for which this iov is valid
    firstRun (int): first run for which this iov is valid
    finalExp (int): final experiment for which this iov is valid
    finalRun (int): final run for which this iov is valid

Definition at line 668 of file __init__.py.

668 def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun):
669 """Modify the validity range of a given iov
670
671 Args:
672 iovId (int): id of the iov to be modified
673 firstExp (int): first experiment for which this iov is valid
674 firstRun (int): first run for which this iov is valid
675 finalExp (int): final experiment for which this iov is valid
676 finalRun (int): final run for which this iov is valid
677 """
678 try:
679 querystring = {"expStart": str(firstExp), "runStart": str(firstRun), "expEnd": str(finalExp), "runEnd": str(finalRun)}
680 self.request("PUT", f"/payloadIov/{iovId}", params=querystring)
681 except ConditionsDB.RequestError as e:
682 B2ERROR(f"Could not modify IOV: {e}")
683

◆ request()

request ( self,
method,
url,
message = None,
* args,
** argk )
Request function, similar to requests.request but adding the base_url

Args:
    method (str): GET, POST, etc.
    url (str): url for the request, base_url will be prepended
    message (str): message to show when starting the request and if it fails

All other arguments will be forwarded to requests.request.

Definition at line 330 of file __init__.py.

330 def request(self, method, url, message=None, *args, **argk):
331 """
332 Request function, similar to requests.request but adding the base_url
333
334 Args:
335 method (str): GET, POST, etc.
336 url (str): url for the request, base_url will be prepended
337 message (str): message to show when starting the request and if it fails
338
339 All other arguments will be forwarded to requests.request.
340 """
341 if message is not None:
342 B2INFO(message)
343
344 try:
345 req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
346 except requests.exceptions.ConnectionError as e:
347 B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
348
349 if req.status_code >= 300:
350 # Apparently something is not good. Let's try to decode the json
351 # reply containing reason and message
352 try:
353 response = req.json()
354 message = response.get("message", "")
355 colon = ": " if message.strip() else ""
356 error = f"Request {method} {url} returned {response['code']} {response['reason']}{colon}{message}"
357 except json.JSONDecodeError:
358 # seems the reply was not even json
359 error = f"Request {method} {url} returned non JSON response {req.status_code}: {req.content}"
360
361 if message is not None:
362 raise ConditionsDB.RequestError(f"{message} failed: {error}")
363 else:
364 raise ConditionsDB.RequestError(error)
365
366 if method != "HEAD" and req.status_code != requests.codes.no_content:
367 try:
368 req.json()
369 except json.JSONDecodeError as e:
370 B2INFO(f"Invalid response: {req.content}")
371 raise ConditionsDB.RequestError(f"{method} {url} returned invalid JSON response {e}")
372 return req
373

◆ set_authentication_token()

set_authentication_token ( self,
token )
Set authentication token when talking to the database

Args:
    token (str): JWT to hand to the database. Will not be checked
        for validity here.

Definition at line 320 of file __init__.py.

320 def set_authentication_token(self, token):
321 """
322 Set authentication token when talking to the database
323
324 Args:
325 token (str): JWT to hand to the database. Will not be checked
326 for validity here.
327 """
328 self._session.auth = BearerAuth(token)
329

◆ staging_request()

staging_request ( self,
filename,
normalize,
data,
password )
Upload a testing payload storage to a staging globaltag and create or update a jira issue

Parameters:
  filename (str): filename of the testing payload storage file that should be uploaded
  normalize (Union[bool, str]): if True the payload root files will be
    normalized to have the same checksum for the same content, if
    normalize is a string in addition the file name in the root file
    metadata will be set to it
  data (dict): a dictionary with the information provided by the user:

    * task: category of globaltag, either main, online, prompt, data, mc, or analysis
    * tag: the globaltag name
    * request: type of request, either Update, New, or Modification. The latter two imply task == main because
      if new payload classes are introduced or payload classes are modified then they will first be included in
      the main globaltag. Here a synchronization of code and payload changes has to be managed.
      If new or modified payload classes should be included in other globaltags they must already be in a release.
    * pull-request: number of the pull request containing new or modified payload classes,
      only for request == New or Modified
    * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
      only for request == Modified
    * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
      only for request == Modified
    * release: the required release version
    * reason: the reason for the request
    * description: a detailed description for the globaltag manager
    * issue: identifier of an existing jira issue (optional)
    * user: name of the user
    * time: time stamp of the request

  password: the password for access to jira or the access token and secret for oauth access

Returns:
  True if the upload and jira issue creation/upload was successful

Definition at line 861 of file __init__.py.

861 def staging_request(self, filename, normalize, data, password):
862 """
863 Upload a testing payload storage to a staging globaltag and create or update a jira issue
864
865 Parameters:
866 filename (str): filename of the testing payload storage file that should be uploaded
867 normalize (Union[bool, str]): if True the payload root files will be
868 normalized to have the same checksum for the same content, if
869 normalize is a string in addition the file name in the root file
870 metadata will be set to it
871 data (dict): a dictionary with the information provided by the user:
872
873 * task: category of globaltag, either main, online, prompt, data, mc, or analysis
874 * tag: the globaltag name
875 * request: type of request, either Update, New, or Modification. The latter two imply task == main because
876 if new payload classes are introduced or payload classes are modified then they will first be included in
877 the main globaltag. Here a synchronization of code and payload changes has to be managed.
878 If new or modified payload classes should be included in other globaltags they must already be in a release.
879 * pull-request: number of the pull request containing new or modified payload classes,
880 only for request == New or Modified
881 * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
882 only for request == Modified
883 * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
884 only for request == Modified
885 * release: the required release version
886 * reason: the reason for the request
887 * description: a detailed description for the globaltag manager
888 * issue: identifier of an existing jira issue (optional)
889 * user: name of the user
890 * time: time stamp of the request
891
892 password: the password for access to jira or the access token and secret for oauth access
893
894 Returns:
895 True if the upload and jira issue creation/upload was successful
896 """
897
898 # determine the staging globaltag name
899 data['tag'] = upload_global_tag(data['task'])
900 if data['tag'] is None:
901 data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
902
903 # create the staging globaltag if it does not exists yet
904 if not self.has_globalTag(data['tag']):
905 if not self.create_globalTag(data['tag'], data['reason'], data['user']):
906 return False
907
908 # upload the payloads
909 B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
910 entries = []
911 if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
912 return False
913
914 # get the dictionary for the jira issue creation/update
915 if data['issue']:
916 issue = data['issue']
917 else:
918 issue = jira_global_tag_v2(data['task'])
919 if issue is None:
920 issue = {"components": [{"name": "globaltag"}]}
921
922 # create jira issue text from provided information
923 if type(issue) is tuple:
924 description = issue[1].format(**data)
925 issue = issue[0]
926 else:
927 description = f"""
928|*Upload globaltag* | {data['tag']} |
929|*Request reason* | {data['reason']} |
930|*Required release* | {data['release']} |
931|*Type of request* | {data['request']} |
932"""
933 if 'pull-request' in data.keys():
934 description += f"|*Pull request* | \\#{data['pull-request']} |\n"
935 if 'backward-compatibility' in data.keys():
936 description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
937 if 'forward-compatibility' in data.keys():
938 description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
939 description += '|*Details* |' + ''.join(data['details']) + ' |\n'
940 if data['task'] == 'online':
941 description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
942
943 # add information about uploaded payloads/IoVs
944 description += '\nPayloads\n||Name||Revision||IoV||\n'
945 for entry in entries:
946 description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
947
948 # create a new issue
949 if type(issue) is dict:
950 issue["description"] = description
951 if "summary" in issue.keys():
952 issue["summary"] = issue["summary"].format(**data)
953 else:
954 issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
955 if "project" not in issue.keys():
956 issue["project"] = {"key": "BII"}
957 if "issuetype" not in issue.keys():
958 issue["issuetype"] = {"name": "Task"}
959 if data["task"] == "main":
960 issue["labels"] = ["TUPPR"]
961
962 B2INFO(f"Creating jira issue for {data['task']} globaltag request")
963 if isinstance(password, str):
964 response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
965 json={'fields': issue})
966 else:
967 fields = {'issue': json.dumps(issue)}
968 if 'user' in data.keys():
969 fields['user'] = data['user']
970 if password:
971 fields['token'] = password[0]
972 fields['secret'] = password[1]
973 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
974 if response.status_code in range(200, 210):
975 B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
976 else:
977 B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
978 return False
979
980 # comment on an existing issue
981 else:
982 # Let's make sure all assignees of new issues are added as watchers
983 # in that case, otherwise they might never find out
984 new_issue_config = jira_global_tag_v2(data['task'])
985 if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
986 user = new_issue_config['assignee'].get('name', None)
987 if user is not None and isinstance(password, str):
988 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
989 auth=(data['user'], password), json=user)
990 if response.status_code in range(200, 210):
991 B2INFO(f"Added {user} as watcher to {issue}")
992 else:
993 B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
994
995 B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
996 if isinstance(password, str):
997 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/comment',
998 auth=(data['user'], password), json={'body': description})
999 else:
1000 fields = {'id': issue, 'user': user, 'comment': description}
1001 if password:
1002 fields['token'] = password[0]
1003 fields['secret'] = password[1]
1004 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1005 if response.status_code in range(200, 210):
1006 B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1007 else:
1008 B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
1009 return False
1010
1011 return True
1012
1013

◆ upload()

upload ( self,
filename,
global_tag,
normalize = False,
ignore_existing = False,
nprocess = 1,
uploaded_entries = None )
Upload a testing payload storage to the conditions database.

Parameters:
  filename (str): filename of the testing payload storage file that should be uploaded
  global_tage (str): name of the globaltag to which the data should be uploaded
  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
    same content, if normalize is a string in addition the file name in the root file metadata will be set to it
  ignore_existing (bool): if True do not upload payloads that already exist
  nprocess (int): maximal number of parallel uploads
  uploaded_entries (list): the list of successfully uploaded entries

Returns:
  True if the upload was successful

Definition at line 714 of file __init__.py.

714 def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
715 """
716 Upload a testing payload storage to the conditions database.
717
718 Parameters:
719 filename (str): filename of the testing payload storage file that should be uploaded
720 global_tage (str): name of the globaltag to which the data should be uploaded
721 normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
722 same content, if normalize is a string in addition the file name in the root file metadata will be set to it
723 ignore_existing (bool): if True do not upload payloads that already exist
724 nprocess (int): maximal number of parallel uploads
725 uploaded_entries (list): the list of successfully uploaded entries
726
727 Returns:
728 True if the upload was successful
729 """
730
731 # first create a list of payloads
732 from conditions_db.testing_payloads import parse_testing_payloads_file
733 B2INFO(f"Reading payload list from {filename}")
734 entries = parse_testing_payloads_file(filename)
735 if entries is None:
736 B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
737 return False
738
739 if not entries:
740 B2INFO(f"No payloads found in {filename}, exiting")
741 return True
742
743 B2INFO(f"Found {len(entries)} iovs to upload")
744
745 # time to get the id for the globaltag
746 tagId = self.get_globalTagInfo(global_tag)
747 if tagId is None:
748 return False
749 tagId = tagId["globalTagId"]
750
751 # now we could have more than one payload with the same iov so let's go over
752 # it again and remove duplicates but keep the last one for each
753 entries = sorted(set(reversed(entries)))
754
755 if normalize:
756 name = normalize if normalize is not True else None
757 for e in entries:
758 e.normalize(name=name)
759
760 # so let's have a list of all payloads (name, checksum) as some payloads
761 # might have multiple iovs. Each payload gets a list of all of those
762 payloads = defaultdict(list)
763 for e in entries:
764 payloads[(e.module, e.checksum)].append(e)
765
766 existing_payloads = {}
767 existing_iovs = {}
768
769 def upload_payload(item):
770 """Upload a payload file if necessary but first check list of existing payloads"""
771 key, entries = item
772 if key in existing_payloads:
773 B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
774 payload_id = existing_payloads[key]
775 else:
776 entry = entries[0]
777 payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
778 if payload_id is None:
779 return False
780
781 B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
782
783 for entry in entries:
784 entry.payload = payload_id
785
786 return True
787
788 def create_iov(entry):
789 """Create an iov if necessary but first check the list of existing iovs"""
790 if entry.payload is None:
791 return None
792
793 iov_key = (entry.payload,) + entry.iov_tuple
794 if iov_key in existing_iovs:
795 entry.iov = existing_iovs[iov_key]
796 B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
797 else:
798 entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
799 if entry.payloadIovId is None:
800 return False
801
802 B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
803
804 return entry
805
806 # multithreading for the win ...
807 with ThreadPoolExecutor(max_workers=nprocess) as pool:
808 # if we want to check for existing payloads/iovs we schedule the download of
809 # the full payload list. And write a message as each completes
810 if not ignore_existing:
811 B2INFO("Downloading information about existing payloads and iovs...")
812 futures = []
813 existing_iovs = {}
814 existing_payloads = {}
815
816 def create_future(iter, func, callback=None):
817 fn = pool.submit(iter, func)
818 if callback is not None:
819 fn.add_done_callback(callback)
820 futures.append(fn)
821
822 def update_iovs(iovs):
823 existing_iovs.update(iovs.result())
824 B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
825
826 def update_payloads(payloads):
827 existing_payloads.update(payloads.result())
828 B2INFO(f"Found {len(existing_payloads)} existing payloads")
829
830 # \cond false positive doxygen warning
831 create_future(self.get_iovs, global_tag, update_iovs)
832 # checking existing payloads should not be done with too many at once
833 for chunk in chunks(payloads.keys(), 1000):
834 create_future(self.check_payloads, chunk, update_payloads)
835 # \endcond
836
837 futures_wait(futures)
838
839 # upload payloads
840 failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
841 if failed_payloads > 0:
842 B2ERROR(f"{failed_payloads} payloads could not be uploaded")
843
844 # create IoVs
845 failed_iovs = 0
846 for entry in pool.map(create_iov, entries):
847 if entry:
848 if uploaded_entries is not None:
849 uploaded_entries.append(entry)
850 else:
851 failed_iovs += 1
852 if failed_iovs > 0:
853 B2ERROR(f"{failed_iovs} IoVs could not be created")
854
855 # update revision numbers
856 if uploaded_entries is not None:
857 self.get_revisions(uploaded_entries)
858
859 return failed_payloads + failed_iovs == 0
860

Member Data Documentation

◆ _base_url

str _base_url = url.rstrip("/") + "/"
protected

base url to be prepended to all requests

Definition at line 303 of file __init__.py.

◆ _session

_session = requests.Session()
protected

session object to get keep-alive support and connection pooling

Definition at line 284 of file __init__.py.

◆ BASE_URLS

list BASE_URLS = [conditions.default_metadata_provider_server]
static

base url to the conditions db to be used if no custom url is given

Definition at line 235 of file __init__.py.


The documentation for this class was generated from the following file: