Belle II Software development
ConditionsDB Class Reference

Classes

class  RequestError
 

Public Member Functions

def __init__ (self, base_url=None, max_connections=10, retries=3)
 
def set_authentication_token (self, token)
 
def request (self, method, url, message=None, *args, **argk)
 
def get_globalTags (self)
 
def has_globalTag (self, name)
 
def get_globalTagInfo (self, name)
 
def get_globalTagType (self, name)
 
def create_globalTag (self, name, description, user)
 
def get_all_iovs (self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False)
 
def get_payloads (self, global_tag=None)
 
def check_payloads (self, payloads, information="payloadId")
 
def get_revisions (self, entries)
 
def create_payload (self, module, filename, checksum=None)
 
def create_iov (self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
 
def delete_iov (self, iovId)
 
def modify_iov (self, iovId, firstExp, firstRun, finalExp, finalRun)
 
def get_iovs (self, globalTagName, payloadName=None)
 
def upload (self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
 
def staging_request (self, filename, normalize, data, password)
 

Static Public Member Functions

def get_base_urls (given_url)
 

Static Public Attributes

list BASE_URLS = [conditions.default_metadata_provider_url]
 base url to the conditions db to be used if no custom url is given
 

Protected Attributes

 _session
 session object to get keep-alive support and connection pooling
 
 _base_url
 base url to be prepended to all requests
 

Detailed Description

Class to interface conditions db REST interface

Definition at line 231 of file __init__.py.

Constructor & Destructor Documentation

◆ __init__()

def __init__ (   self,
  base_url = None,
  max_connections = 10,
  retries = 3 
)
Create a new instance of the interface

Args:
    base_url (string): base url of the rest interface
    max_connections (int): number of connections to keep open, mostly useful for threaded applications
    retries (int): number of retries in case of connection problems

Definition at line 273 of file __init__.py.

273 def __init__(self, base_url=None, max_connections=10, retries=3):
274 """
275 Create a new instance of the interface
276
277 Args:
278 base_url (string): base url of the rest interface
279 max_connections (int): number of connections to keep open, mostly useful for threaded applications
280 retries (int): number of retries in case of connection problems
281 """
282
283
284 self._session = requests.Session()
285 # and set the connection options we want to have
286 adapter = requests.adapters.HTTPAdapter(
287 pool_connections=max_connections, pool_maxsize=max_connections,
288 max_retries=retries, pool_block=True
289 )
290 self._session.mount("http://", adapter)
291 self._session.mount("https://", adapter)
292 # and also set the proxy settings
293 if "BELLE2_CONDB_PROXY" in os.environ:
294 self._session.proxies = {
295 "http": os.environ.get("BELLE2_CONDB_PROXY"),
296 "https": os.environ.get("BELLE2_CONDB_PROXY"),
297 }
298 # test the given url or try the known defaults
299 base_url_list = ConditionsDB.get_base_urls(base_url)
300
301 for url in base_url_list:
302
303 self._base_url = url.rstrip("/") + "/"
304 try:
305 req = self._session.request("HEAD", self._base_url + "v2/globalTags")
306 req.raise_for_status()
307 except requests.RequestException as e:
308 B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
309 else:
310 break
311 else:
312 B2FATAL("No working database servers configured, giving up")
313
314 # We have a working server so change the api to return json instead of
315 # xml, much easier in python, also request non-cached replies. We do
316 # this now because for the server check above we're fine with cached
317 # results.
318 self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
319

Member Function Documentation

◆ check_payloads()

def check_payloads (   self,
  payloads,
  information = "payloadId" 
)
Check for the existence of payloads in the database.

Arguments:
    payloads (list((str,str))): A list of payloads to check for. Each
       payload needs to be a tuple of the name of the payload and the
       md5 checksum of the payload file.
    information (str): The information to be extracted from the
       payload dictionary

Returns:
    A dictionary with the payload identifiers (name, checksum) as keys
    and the requested information as values for all payloads which are already
    present in the database.

Definition at line 528 of file __init__.py.

528 def check_payloads(self, payloads, information="payloadId"):
529 """
530 Check for the existence of payloads in the database.
531
532 Arguments:
533 payloads (list((str,str))): A list of payloads to check for. Each
534 payload needs to be a tuple of the name of the payload and the
535 md5 checksum of the payload file.
536 information (str): The information to be extracted from the
537 payload dictionary
538
539 Returns:
540 A dictionary with the payload identifiers (name, checksum) as keys
541 and the requested information as values for all payloads which are already
542 present in the database.
543 """
544
545 search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
546 try:
547 req = self.request("POST", "/checkPayloads", json=search_query)
548 except ConditionsDB.RequestError as e:
549 B2ERROR(f"Cannot check for existing payloads: {e}")
550 return {}
551
552 result = {}
553 for payload in req.json():
554 module = payload["basf2Module"]["name"]
555 checksum = payload["checksum"]
556 result[(module, checksum)] = payload[information]
557
558 return result
559

◆ create_globalTag()

def create_globalTag (   self,
  name,
  description,
  user 
)
Create a new globaltag

Definition at line 431 of file __init__.py.

431 def create_globalTag(self, name, description, user):
432 """
433 Create a new globaltag
434 """
435 info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
436 try:
437 req = self.request("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
438 except ConditionsDB.RequestError as e:
439 B2ERROR(f"Could not create globaltag {name}: {e}")
440 return None
441
442 return req.json()
443

◆ create_iov()

def create_iov (   self,
  globalTagId,
  payloadId,
  firstExp,
  firstRun,
  finalExp,
  finalRun 
)
Create an iov.

Args:
    globalTagId (int): id of the globaltag, obtain with get_globalTagId()
    payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
    firstExp (int): first experiment for which this iov is valid
    firstRun (int): first run for which this iov is valid
    finalExp (int): final experiment for which this iov is valid
    finalRun (int): final run for which this iov is valid

Returns:
    payloadIovId of the created iov, None if creation was not successful

Definition at line 622 of file __init__.py.

622 def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
623 """
624 Create an iov.
625
626 Args:
627 globalTagId (int): id of the globaltag, obtain with get_globalTagId()
628 payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
629 firstExp (int): first experiment for which this iov is valid
630 firstRun (int): first run for which this iov is valid
631 finalExp (int): final experiment for which this iov is valid
632 finalRun (int): final run for which this iov is valid
633
634 Returns:
635 payloadIovId of the created iov, None if creation was not successful
636 """
637 try:
638 # try to convert all arguments except self to integers to make sure they are
639 # valid.
640 local_variables = locals()
641 variables = {e: int(local_variables[e]) for e in
642 ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
643 except ValueError:
644 B2ERROR("create_iov: All parameters need to be integers")
645 return None
646
647 # try to create the iov
648 try:
649 req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
650 "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
651 except ConditionsDB.RequestError as e:
652 B2ERROR(f"Could not create IOV: {e}")
653 return None
654
655 return req.json()["payloadIovId"]
656

◆ create_payload()

def create_payload (   self,
  module,
  filename,
  checksum = None 
)
Create a new payload

Args:
    module (str): name of the module
    filename (str): name of the file
    checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given

Definition at line 581 of file __init__.py.

581 def create_payload(self, module, filename, checksum=None):
582 """
583 Create a new payload
584
585 Args:
586 module (str): name of the module
587 filename (str): name of the file
588 checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
589 """
590 if checksum is None:
591 checksum = file_checksum(filename)
592
593 # this is the only complicated request as we have to provide a
594 # multipart/mixed request which is not directly provided by the request
595 # library.
596 files = [
597 (filename, open(filename, "rb").read(), "application/x-root"),
598 ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
599 ]
600 # ok we have the two "files" we want to send, create multipart/mixed
601 # body
602 fields = []
603 for name, contents, mimetype in files:
604 rf = RequestField(name=name, data=contents)
605 rf.make_multipart(content_type=mimetype)
606 fields.append(rf)
607
608 post_body, content_type = encode_multipart_formdata(fields)
609 content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
610 headers = {'Content-Type': content_type}
611
612 # now make the request. Note to self: if multipart/form-data would be
613 # accepted this would be so much nicer here. but it works.
614 try:
615 req = self.request("POST", f"/package/dbstore/module/{encode_name(module)}/payload", data=post_body, headers=headers)
616 except ConditionsDB.RequestError as e:
617 B2ERROR(f"Could not create Payload: {e}")
618 return None
619
620 return req.json()["payloadId"]
621

◆ delete_iov()

def delete_iov (   self,
  iovId 
)
Delete an iov

Args:
    iovId (int): id of the iov to be deleted

Definition at line 657 of file __init__.py.

657 def delete_iov(self, iovId):
658 """Delete an iov
659
660 Args:
661 iovId (int): id of the iov to be deleted
662 """
663 try:
664 self.request("DELETE", f"/payloadIov/{iovId}")
665 except ConditionsDB.RequestError as e:
666 B2ERROR(f"Could not delete IOV: {e}")
667

◆ get_all_iovs()

def get_all_iovs (   self,
  globalTag,
  exp = None,
  run = None,
  message = None,
  run_range = None,
  fully_contained = False 
)
Return list of all payloads in the given globaltag where each element is
a `PayloadInformation` instance

Parameters:
    gobalTag (str): name of the globaltag
    exp (int): if given limit the list of payloads to the ones valid for
        the given exp,run combination
    run (int): if given limit the list of payloads to the ones valid for
        the given exp,run combination
    message (str): additional message to show when downloading the
        payload information. Will be directly appended to
        "Obtaining lists of iovs for globaltag {globalTag}"
    run_range (tuple): if given limit the list of payloads to the ones
        overlapping with the given run range, if
    fully_contained (bool): if True and the run_range is not None it limits
        the list of payloads to the ones fully contained in the given run range

Warning:
    Both, exp and run, need to be given at the same time. Just supplying
    an experiment or a run number will not work

Definition at line 444 of file __init__.py.

444 def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
445 """
446 Return list of all payloads in the given globaltag where each element is
447 a `PayloadInformation` instance
448
449 Parameters:
450 gobalTag (str): name of the globaltag
451 exp (int): if given limit the list of payloads to the ones valid for
452 the given exp,run combination
453 run (int): if given limit the list of payloads to the ones valid for
454 the given exp,run combination
455 message (str): additional message to show when downloading the
456 payload information. Will be directly appended to
457 "Obtaining lists of iovs for globaltag {globalTag}"
458 run_range (tuple): if given limit the list of payloads to the ones
459 overlapping with the given run range, if
460 fully_contained (bool): if True and the run_range is not None it limits
461 the list of payloads to the ones fully contained in the given run range
462
463 Warning:
464 Both, exp and run, need to be given at the same time. Just supplying
465 an experiment or a run number will not work
466 """
467 globalTag = encode_name(globalTag)
468 if message is None:
469 message = ""
470 if run_range is not None:
471 if fully_contained:
472 message += f" [fully contained in {tuple(run_range)}]"
473 else:
474 message += f" [valid in {tuple(run_range)}]"
475 run_range = IntervalOfValidity(run_range)
476
477 if exp is not None:
478 msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
479 req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
480 else:
481 msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
482 req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
483 all_iovs = []
484 for item in req.json():
485 payload = item["payload" if 'payload' in item else "payloadId"]
486 if "payloadIov" in item:
487 iovs = [item['payloadIov']]
488 else:
489 iovs = item['payloadIovs']
490
491 for iov in iovs:
492 if run_range is not None:
493 iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
494 if fully_contained:
495 if not iov_ & run_range == iov_:
496 continue
497 else:
498 if iov_ & run_range is None:
499 continue
500 all_iovs.append(PayloadInformation.from_json(payload, iov))
501
502 all_iovs.sort()
503 return all_iovs
504

◆ get_base_urls()

def get_base_urls (   given_url)
static
Resolve the list of server urls. If a url is given just return it.
Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
builtin defaults

Arguments:
    given_url (str): Explicit base_url. If this is not None it will be
       returned as is in a list of length 1

Returns:
    a list of urls to try for database connectivity

Definition at line 242 of file __init__.py.

242 def get_base_urls(given_url):
243 """Resolve the list of server urls. If a url is given just return it.
244 Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
245 builtin defaults
246
247 Arguments:
248 given_url (str): Explicit base_url. If this is not None it will be
249 returned as is in a list of length 1
250
251 Returns:
252 a list of urls to try for database connectivity
253 """
254
255 base_url_list = ConditionsDB.BASE_URLS[:]
256 base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
257 if given_url is not None:
258 base_url_list = [given_url]
259 elif base_url_env is not None:
260 base_url_list = base_url_env.split()
261 B2INFO("Getting Conditions Database servers from Environment:")
262 for i, url in enumerate(base_url_list, 1):
263 B2INFO(f" {i}. {url}")
264 # try to escalate to https for all given urls
265 full_list = []
266 for url in base_url_list:
267 if url.startswith("http://"):
268 full_list.append("https" + url[4:])
269 # but keep the http in case of connection problems
270 full_list.append(url)
271 return full_list
272

◆ get_globalTagInfo()

def get_globalTagInfo (   self,
  name 
)
Get the id of the globaltag with the given name. Returns either the
id or None if the tag was not found

Definition at line 400 of file __init__.py.

400 def get_globalTagInfo(self, name):
401 """Get the id of the globaltag with the given name. Returns either the
402 id or None if the tag was not found"""
403
404 try:
405 req = self.request("GET", f"/globalTag/{encode_name(name)}")
406 except ConditionsDB.RequestError as e:
407 B2ERROR(f"Cannot find globaltag '{name}': {e}")
408 return None
409
410 return req.json()
411

◆ get_globalTags()

def get_globalTags (   self)
Get a list of all globaltags. Returns a dictionary with the globaltag
names and the corresponding ids in the database

Definition at line 374 of file __init__.py.

374 def get_globalTags(self):
375 """Get a list of all globaltags. Returns a dictionary with the globaltag
376 names and the corresponding ids in the database"""
377
378 try:
379 req = self.request("GET", "/globalTags")
380 except ConditionsDB.RequestError as e:
381 B2ERROR(f"Could not get the list of globaltags: {e}")
382 return None
383
384 result = {}
385 for tag in req.json():
386 result[tag["name"]] = tag
387
388 return result
389

◆ get_globalTagType()

def get_globalTagType (   self,
  name 
)
Get the dictionary describing the given globaltag type (currently
one of DEV or RELEASE). Returns None if tag type was not found.

Definition at line 412 of file __init__.py.

412 def get_globalTagType(self, name):
413 """
414 Get the dictionary describing the given globaltag type (currently
415 one of DEV or RELEASE). Returns None if tag type was not found.
416 """
417 try:
418 req = self.request("GET", "/globalTagType")
419 except ConditionsDB.RequestError as e:
420 B2ERROR(f"Could not get list of valid globaltag types: {e}")
421 return None
422
423 types = {e["name"]: e for e in req.json()}
424
425 if name in types:
426 return types[name]
427
428 B2ERROR(f"Unknown globaltag type: '{name}', please use one of {', '.join(types)}")
429 return None
430

◆ get_iovs()

def get_iovs (   self,
  globalTagName,
  payloadName = None 
)
Return existing iovs for a given tag name. It returns a dictionary
which maps (payloadId, first runId, final runId) to iovId

Parameters:
  globalTagName(str): Global tag name.
  payloadName(str):   Payload name (if None, selection by name is
                      not performed.

Definition at line 684 of file __init__.py.

684 def get_iovs(self, globalTagName, payloadName=None):
685 """Return existing iovs for a given tag name. It returns a dictionary
686 which maps (payloadId, first runId, final runId) to iovId
687
688 Parameters:
689 globalTagName(str): Global tag name.
690 payloadName(str): Payload name (if None, selection by name is
691 not performed.
692 """
693
694 try:
695 req = self.request("GET", f"/globalTag/{encode_name(globalTagName)}/globalTagPayloads")
696 except ConditionsDB.RequestError:
697 # there could be just no iovs so no error
698 return {}
699
700 result = {}
701 for payload in req.json():
702 payloadId = payload["payloadId"]["payloadId"]
703 if payloadName is not None:
704 if payload["payloadId"]["basf2Module"]["name"] != payloadName:
705 continue
706 for iov in payload["payloadIovs"]:
707 iovId = iov["payloadIovId"]
708 firstExp, firstRun = iov["expStart"], iov["runStart"]
709 finalExp, finalRun = iov["expEnd"], iov["runEnd"]
710 result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
711
712 return result
713

◆ get_payloads()

def get_payloads (   self,
  global_tag = None 
)
Get a list of all defined payloads (for the given global_tag or by default for all).
Returns a dictionary which maps (module, checksum) to the payload id.

Definition at line 505 of file __init__.py.

505 def get_payloads(self, global_tag=None):
506 """
507 Get a list of all defined payloads (for the given global_tag or by default for all).
508 Returns a dictionary which maps (module, checksum) to the payload id.
509 """
510
511 try:
512 if global_tag:
513 req = self.request("GET", f"/globalTag/{encode_name(global_tag)}/payloads")
514 else:
515 req = self.request("GET", "/payloads")
516 except ConditionsDB.RequestError as e:
517 B2ERROR(f"Cannot get list of payloads: {e}")
518 return {}
519
520 result = {}
521 for payload in req.json():
522 module = payload["basf2Module"]["name"]
523 checksum = payload["checksum"]
524 result[(module, checksum)] = payload["payloadId"]
525
526 return result
527

◆ get_revisions()

def get_revisions (   self,
  entries 
)
Get the revision numbers of payloads in the database.

Arguments:
    entries (list): A list of payload entries.
       Each entry must have the attributes module and checksum.

Returns:
    True if successful.

Definition at line 560 of file __init__.py.

560 def get_revisions(self, entries):
561 """
562 Get the revision numbers of payloads in the database.
563
564 Arguments:
565 entries (list): A list of payload entries.
566 Each entry must have the attributes module and checksum.
567
568 Returns:
569 True if successful.
570 """
571
572 result = self.check_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
573 if not result:
574 return False
575
576 for entry in entries:
577 entry.revision = result.get((entry.module, entry.checksum), 0)
578
579 return True
580

◆ has_globalTag()

def has_globalTag (   self,
  name 
)
Check whether the globaltag with the given name exists.

Definition at line 390 of file __init__.py.

390 def has_globalTag(self, name):
391 """Check whether the globaltag with the given name exists."""
392
393 try:
394 self.request("GET", f"/globalTag/{encode_name(name)}")
395 except ConditionsDB.RequestError:
396 return False
397
398 return True
399

◆ modify_iov()

def modify_iov (   self,
  iovId,
  firstExp,
  firstRun,
  finalExp,
  finalRun 
)
Modify the validity range of a given iov

Args:
    iovId (int): id of the iov to be modified
    firstExp (int): first experiment for which this iov is valid
    firstRun (int): first run for which this iov is valid
    finalExp (int): final experiment for which this iov is valid
    finalRun (int): final run for which this iov is valid

Definition at line 668 of file __init__.py.

668 def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun):
669 """Modify the validity range of a given iov
670
671 Args:
672 iovId (int): id of the iov to be modified
673 firstExp (int): first experiment for which this iov is valid
674 firstRun (int): first run for which this iov is valid
675 finalExp (int): final experiment for which this iov is valid
676 finalRun (int): final run for which this iov is valid
677 """
678 try:
679 querystring = {"expStart": str(firstExp), "runStart": str(firstRun), "expEnd": str(finalExp), "runEnd": str(finalRun)}
680 self.request("PUT", f"/payloadIov/{iovId}", params=querystring)
681 except ConditionsDB.RequestError as e:
682 B2ERROR(f"Could not modify IOV: {e}")
683

◆ request()

def request (   self,
  method,
  url,
  message = None,
args,
**  argk 
)
Request function, similar to requests.request but adding the base_url

Args:
    method (str): GET, POST, etc.
    url (str): url for the request, base_url will be prepended
    message (str): message to show when starting the request and if it fails

All other arguments will be forwarded to requests.request.

Definition at line 330 of file __init__.py.

330 def request(self, method, url, message=None, *args, **argk):
331 """
332 Request function, similar to requests.request but adding the base_url
333
334 Args:
335 method (str): GET, POST, etc.
336 url (str): url for the request, base_url will be prepended
337 message (str): message to show when starting the request and if it fails
338
339 All other arguments will be forwarded to requests.request.
340 """
341 if message is not None:
342 B2INFO(message)
343
344 try:
345 req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
346 except requests.exceptions.ConnectionError as e:
347 B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
348
349 if req.status_code >= 300:
350 # Apparently something is not good. Let's try to decode the json
351 # reply containing reason and message
352 try:
353 response = req.json()
354 message = response.get("message", "")
355 colon = ": " if message.strip() else ""
356 error = f"Request {method} {url} returned {response['code']} {response['reason']}{colon}{message}"
357 except json.JSONDecodeError:
358 # seems the reply was not even json
359 error = f"Request {method} {url} returned non JSON response {req.status_code}: {req.content}"
360
361 if message is not None:
362 raise ConditionsDB.RequestError(f"{message} failed: {error}")
363 else:
364 raise ConditionsDB.RequestError(error)
365
366 if method != "HEAD" and req.status_code != requests.codes.no_content:
367 try:
368 req.json()
369 except json.JSONDecodeError as e:
370 B2INFO(f"Invalid response: {req.content}")
371 raise ConditionsDB.RequestError(f"{method} {url} returned invalid JSON response {e}")
372 return req
373

◆ set_authentication_token()

def set_authentication_token (   self,
  token 
)
Set authentication token when talking to the database

Args:
    token (str): JWT to hand to the database. Will not be checked
        for validity here.

Definition at line 320 of file __init__.py.

320 def set_authentication_token(self, token):
321 """
322 Set authentication token when talking to the database
323
324 Args:
325 token (str): JWT to hand to the database. Will not be checked
326 for validity here.
327 """
328 self._session.auth = BearerAuth(token)
329

◆ staging_request()

def staging_request (   self,
  filename,
  normalize,
  data,
  password 
)
Upload a testing payload storage to a staging globaltag and create or update a jira issue

Parameters:
  filename (str): filename of the testing payload storage file that should be uploaded
  normalize (Union[bool, str]): if True the payload root files will be
    normalized to have the same checksum for the same content, if
    normalize is a string in addition the file name in the root file
    metadata will be set to it
  data (dict): a dictionary with the information provided by the user:

    * task: category of globaltag, either main, online, prompt, data, mc, or analysis
    * tag: the globaltag name
    * request: type of request, either Update, New, or Modification. The latter two imply task == main because
      if new payload classes are introduced or payload classes are modified then they will first be included in
      the main globaltag. Here a synchronization of code and payload changes has to be managed.
      If new or modified payload classes should be included in other globaltags they must already be in a release.
    * pull-request: number of the pull request containing new or modified payload classes,
      only for request == New or Modified
    * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
      only for request == Modified
    * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
      only for request == Modified
    * release: the required release version
    * reason: the reason for the request
    * description: a detailed description for the globaltag manager
    * issue: identifier of an existing jira issue (optional)
    * user: name of the user
    * time: time stamp of the request

  password: the password for access to jira or the access token and secret for oauth access

Returns:
  True if the upload and jira issue creation/upload was successful

Definition at line 859 of file __init__.py.

859 def staging_request(self, filename, normalize, data, password):
860 """
861 Upload a testing payload storage to a staging globaltag and create or update a jira issue
862
863 Parameters:
864 filename (str): filename of the testing payload storage file that should be uploaded
865 normalize (Union[bool, str]): if True the payload root files will be
866 normalized to have the same checksum for the same content, if
867 normalize is a string in addition the file name in the root file
868 metadata will be set to it
869 data (dict): a dictionary with the information provided by the user:
870
871 * task: category of globaltag, either main, online, prompt, data, mc, or analysis
872 * tag: the globaltag name
873 * request: type of request, either Update, New, or Modification. The latter two imply task == main because
874 if new payload classes are introduced or payload classes are modified then they will first be included in
875 the main globaltag. Here a synchronization of code and payload changes has to be managed.
876 If new or modified payload classes should be included in other globaltags they must already be in a release.
877 * pull-request: number of the pull request containing new or modified payload classes,
878 only for request == New or Modified
879 * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
880 only for request == Modified
881 * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
882 only for request == Modified
883 * release: the required release version
884 * reason: the reason for the request
885 * description: a detailed description for the globaltag manager
886 * issue: identifier of an existing jira issue (optional)
887 * user: name of the user
888 * time: time stamp of the request
889
890 password: the password for access to jira or the access token and secret for oauth access
891
892 Returns:
893 True if the upload and jira issue creation/upload was successful
894 """
895
896 # determine the staging globaltag name
897 data['tag'] = upload_global_tag(data['task'])
898 if data['tag'] is None:
899 data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
900
901 # create the staging globaltag if it does not exists yet
902 if not self.has_globalTag(data['tag']):
903 if not self.create_globalTag(data['tag'], data['reason'], data['user']):
904 return False
905
906 # upload the payloads
907 B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
908 entries = []
909 if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
910 return False
911
912 # get the dictionary for the jira issue creation/update
913 if data['issue']:
914 issue = data['issue']
915 else:
916 issue = jira_global_tag_v2(data['task'])
917 if issue is None:
918 issue = {"components": [{"name": "globaltag"}]}
919
920 # create jira issue text from provided information
921 if type(issue) is tuple:
922 description = issue[1].format(**data)
923 issue = issue[0]
924 else:
925 description = f"""
926|*Upload globaltag* | {data['tag']} |
927|*Request reason* | {data['reason']} |
928|*Required release* | {data['release']} |
929|*Type of request* | {data['request']} |
930"""
931 if 'pull-request' in data.keys():
932 description += f"|*Pull request* | \\#{data['pull-request']} |\n"
933 if 'backward-compatibility' in data.keys():
934 description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
935 if 'forward-compatibility' in data.keys():
936 description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
937 description += '|*Details* |' + ''.join(data['details']) + ' |\n'
938 if data['task'] == 'online':
939 description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
940
941 # add information about uploaded payloads/IoVs
942 description += '\nPayloads\n||Name||Revision||IoV||\n'
943 for entry in entries:
944 description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
945
946 # create a new issue
947 if type(issue) is dict:
948 issue["description"] = description
949 if "summary" in issue.keys():
950 issue["summary"] = issue["summary"].format(**data)
951 else:
952 issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
953 if "project" not in issue.keys():
954 issue["project"] = {"key": "BII"}
955 if "issuetype" not in issue.keys():
956 issue["issuetype"] = {"name": "Task"}
957 if data["task"] == "main":
958 issue["labels"] = ["TUPPR"]
959
960 B2INFO(f"Creating jira issue for {data['task']} globaltag request")
961 if isinstance(password, str):
962 response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
963 json={'fields': issue})
964 else:
965 fields = {'issue': json.dumps(issue)}
966 if 'user' in data.keys():
967 fields['user'] = data['user']
968 if password:
969 fields['token'] = password[0]
970 fields['secret'] = password[1]
971 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
972 if response.status_code in range(200, 210):
973 B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
974 else:
975 B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
976 return False
977
978 # comment on an existing issue
979 else:
980 # Let's make sure all assignees of new issues are added as watchers
981 # in that case, otherwise they might never find out
982 new_issue_config = jira_global_tag_v2(data['task'])
983 if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
984 user = new_issue_config['assignee'].get('name', None)
985 if user is not None and isinstance(password, str):
986 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
987 auth=(data['user'], password), json=user)
988 if response.status_code in range(200, 210):
989 B2INFO(f"Added {user} as watcher to {issue}")
990 else:
991 B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
992
993 B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
994 if isinstance(password, str):
995 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/comment',
996 auth=(data['user'], password), json={'body': description})
997 else:
998 fields = {'id': issue, 'user': user, 'comment': description}
999 if password:
1000 fields['token'] = password[0]
1001 fields['secret'] = password[1]
1002 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1003 if response.status_code in range(200, 210):
1004 B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1005 else:
1006 B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
1007 return False
1008
1009 return True
1010
1011

◆ upload()

def upload (   self,
  filename,
  global_tag,
  normalize = False,
  ignore_existing = False,
  nprocess = 1,
  uploaded_entries = None 
)
Upload a testing payload storage to the conditions database.

Parameters:
  filename (str): filename of the testing payload storage file that should be uploaded
  global_tage (str): name of the globaltag to which the data should be uploaded
  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
    same content, if normalize is a string in addition the file name in the root file metadata will be set to it
  ignore_existing (bool): if True do not upload payloads that already exist
  nprocess (int): maximal number of parallel uploads
  uploaded_entries (list): the list of successfully uploaded entries

Returns:
  True if the upload was successful

Definition at line 714 of file __init__.py.

714 def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
715 """
716 Upload a testing payload storage to the conditions database.
717
718 Parameters:
719 filename (str): filename of the testing payload storage file that should be uploaded
720 global_tage (str): name of the globaltag to which the data should be uploaded
721 normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
722 same content, if normalize is a string in addition the file name in the root file metadata will be set to it
723 ignore_existing (bool): if True do not upload payloads that already exist
724 nprocess (int): maximal number of parallel uploads
725 uploaded_entries (list): the list of successfully uploaded entries
726
727 Returns:
728 True if the upload was successful
729 """
730
731 # first create a list of payloads
732 from conditions_db.testing_payloads import parse_testing_payloads_file
733 B2INFO(f"Reading payload list from {filename}")
734 entries = parse_testing_payloads_file(filename)
735 if entries is None:
736 B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
737 return False
738
739 if not entries:
740 B2INFO(f"No payloads found in {filename}, exiting")
741 return True
742
743 B2INFO(f"Found {len(entries)} iovs to upload")
744
745 # time to get the id for the globaltag
746 tagId = self.get_globalTagInfo(global_tag)
747 if tagId is None:
748 return False
749 tagId = tagId["globalTagId"]
750
751 # now we could have more than one payload with the same iov so let's go over
752 # it again and remove duplicates but keep the last one for each
753 entries = sorted(set(reversed(entries)))
754
755 if normalize:
756 name = normalize if normalize is not True else None
757 for e in entries:
758 e.normalize(name=name)
759
760 # so let's have a list of all payloads (name, checksum) as some payloads
761 # might have multiple iovs. Each payload gets a list of all of those
762 payloads = defaultdict(list)
763 for e in entries:
764 payloads[(e.module, e.checksum)].append(e)
765
766 existing_payloads = {}
767 existing_iovs = {}
768
769 def upload_payload(item):
770 """Upload a payload file if necessary but first check list of existing payloads"""
771 key, entries = item
772 if key in existing_payloads:
773 B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
774 payload_id = existing_payloads[key]
775 else:
776 entry = entries[0]
777 payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
778 if payload_id is None:
779 return False
780
781 B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
782
783 for entry in entries:
784 entry.payload = payload_id
785
786 return True
787
788 def create_iov(entry):
789 """Create an iov if necessary but first check the list of existing iovs"""
790 if entry.payload is None:
791 return None
792
793 iov_key = (entry.payload,) + entry.iov_tuple
794 if iov_key in existing_iovs:
795 entry.iov = existing_iovs[iov_key]
796 B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
797 else:
798 entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
799 if entry.payloadIovId is None:
800 return False
801
802 B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
803
804 return entry
805
806 # multithreading for the win ...
807 with ThreadPoolExecutor(max_workers=nprocess) as pool:
808 # if we want to check for existing payloads/iovs we schedule the download of
809 # the full payload list. And write a message as each completes
810 if not ignore_existing:
811 B2INFO("Downloading information about existing payloads and iovs...")
812 futures = []
813 existing_iovs = {}
814 existing_payloads = {}
815
816 def create_future(iter, func, callback=None):
817 fn = pool.submit(iter, func)
818 if callback is not None:
819 fn.add_done_callback(callback)
820 futures.append(fn)
821
822 def update_iovs(iovs):
823 existing_iovs.update(iovs.result())
824 B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
825
826 def update_payloads(payloads):
827 existing_payloads.update(payloads.result())
828 B2INFO(f"Found {len(existing_payloads)} existing payloads")
829
830 create_future(self.get_iovs, global_tag, update_iovs)
831 # checking existing payloads should not be done with too many at once
832 for chunk in chunks(payloads.keys(), 1000):
833 create_future(self.check_payloads, chunk, update_payloads)
834
835 futures_wait(futures)
836
837 # upload payloads
838 failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
839 if failed_payloads > 0:
840 B2ERROR(f"{failed_payloads} payloads could not be uploaded")
841
842 # create IoVs
843 failed_iovs = 0
844 for entry in pool.map(create_iov, entries):
845 if entry:
846 if uploaded_entries is not None:
847 uploaded_entries.append(entry)
848 else:
849 failed_iovs += 1
850 if failed_iovs > 0:
851 B2ERROR(f"{failed_iovs} IoVs could not be created")
852
853 # update revision numbers
854 if uploaded_entries is not None:
855 self.get_revisions(uploaded_entries)
856
857 return failed_payloads + failed_iovs == 0
858

Member Data Documentation

◆ _base_url

_base_url
protected

base url to be prepended to all requests

Definition at line 303 of file __init__.py.

◆ _session

_session
protected

session object to get keep-alive support and connection pooling

Definition at line 284 of file __init__.py.

◆ BASE_URLS

list BASE_URLS = [conditions.default_metadata_provider_url]
static

base url to the conditions db to be used if no custom url is given

Definition at line 235 of file __init__.py.


The documentation for this class was generated from the following file: