8 Python interface to the ConditionsDB
12 from basf2
import B2FATAL, B2ERROR, B2INFO, B2WARNING
14 from requests.auth
import HTTPBasicAuth, HTTPDigestAuth
15 from requests.packages.urllib3.fields
import RequestField
16 from requests.packages.urllib3.filepost
import encode_multipart_formdata
19 from versioning
import upload_global_tag, jira_global_tag_v2
20 from collections
import defaultdict
21 from concurrent.futures
import ThreadPoolExecutor, wait
as futures_wait
26 def encode_name(name):
27 """Escape name to be used in an url"""
28 return urllib.parse.quote(name, safe=
"")
31 def file_checksum(filename):
32 """Calculate md5 hash of file"""
33 md5hash = hashlib.md5()
34 with open(filename,
"rb")
as data:
35 md5hash.update(data.read())
36 return md5hash.hexdigest()
39 def chunks(container, chunk_size):
40 """Cut a container in chunks of max. chunk_size"""
43 chunk = tuple(itertools.islice(it, chunk_size))
50 """Small container class to help compare payload information for efficient
51 comparison between globaltags"""
55 """Set all internal members from the json information of the payload and the iov.
58 payload (dict): json information of the payload as returned by REST api
59 iov (dict): json information of the iov as returned by REST api
62 iov = {
"payloadIovId":
None,
"expStart":
None,
"runStart":
None,
"expEnd":
None,
"runEnd":
None}
66 payload[
'basf2Module'][
'name'],
69 payload[
'payloadUrl'],
72 (iov[
"expStart"], iov[
"runStart"], iov[
"expEnd"], iov[
"runEnd"]),
75 def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
77 Create a new object from the given information
98 """Return the full url to the payload on the server"""
102 """Make object hashable"""
106 """Check if two payloads are equal"""
107 return (self.
name, self.
checksum, self.
iov) == (other.name, other.checksum, other.iov)
110 """Sort payloads by name, iov, revision"""
111 return (self.
name.lower(), self.
iov, self.
revision) < (other.name.lower(), other.iov, other.revision)
114 """return a human readable name for the IoV"""
118 if self.
iov == (0, 0, -1, -1):
121 e1, r1, e2, r2 = self.
iov
123 if r1 == 0
and r2 == -1:
126 return f
"exp {e1}, runs {r1}+"
128 return f
"exp {e1}, run {r1}"
130 return f
"exp {e1}, runs {r1} - {r2}"
132 if e2 == -1
and r1 == 0:
133 return f
"exp {e1} - forever"
135 return f
"exp {e1}, run {r1} - forever"
136 elif r1 == 0
and r2 == -1:
137 return f
"exp {e1}-{e2}, all runs"
139 return f
"exp {e1}, run {r1} - exp {e2}, all runs"
141 return f
"exp {e1}, run {r1} - exp {e2}, run {r2}"
145 """Class to interface conditions db REST interface"""
148 BASE_URLS = [
"http://belle2db.sdcc.bnl.gov/b2s/rest/"]
151 """Class to be thrown by request() if there is any error"""
156 """Resolve the list of server urls. If a url is given just return it.
157 Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
161 given_url (str): Explicit base_url. If this is not None it will be
162 returned as is in a list of length 1
165 a list of urls to try for database connectivity
168 base_url_list = ConditionsDB.BASE_URLS[:]
169 base_url_env = os.environ.get(
"BELLE2_CONDB_SERVERLIST",
None)
170 if given_url
is not None:
171 base_url_list = [given_url]
172 elif base_url_env
is not None:
173 base_url_list = base_url_env.split()
174 B2INFO(
"Getting Conditions Database servers from Environment:")
175 for i, url
in enumerate(base_url_list, 1):
176 B2INFO(f
" {i}. {url}")
179 for url
in base_url_list:
180 if url.startswith(
"http://"):
181 full_list.append(
"https" + url[4:])
183 full_list.append(url)
186 def __init__(self, base_url=None, max_connections=10, retries=3):
188 Create a new instance of the interface
191 base_url (string): base url of the rest interface
192 max_connections (int): number of connections to keep open, mostly useful for threaded applications
193 retries (int): number of retries in case of connection problems
199 adapter = requests.adapters.HTTPAdapter(
200 pool_connections=max_connections, pool_maxsize=max_connections,
201 max_retries=retries, pool_block=
True
203 self.
_session.mount(
"http://", adapter)
204 self.
_session.mount(
"https://", adapter)
206 if "BELLE2_CONDB_PROXY" in os.environ:
208 "http": os.environ.get(
"BELLE2_CONDB_PROXY"),
209 "https": os.environ.get(
"BELLE2_CONDB_PROXY"),
212 base_url_list = ConditionsDB.get_base_urls(base_url)
214 for url
in base_url_list:
219 req.raise_for_status()
220 except requests.RequestException
as e:
221 B2WARNING(f
"Problem connecting to {url}:\n {e}\n Trying next server ...")
225 B2FATAL(
"No working database servers configured, giving up")
231 self.
_session.headers.update({
"Accept":
"application/json",
"Cache-Control":
"no-cache"})
235 Set authentication credentials when talking to the database
239 password (str): password
240 basic (bool): if True us HTTP Basic authentication, otherwise HTTP Digest
242 authtype = HTTPBasicAuth
if basic
else HTTPDigestAuth
243 self.
_session.auth = authtype(user, password)
245 def request(self, method, url, message=None, *args, **argk):
247 Request function, similar to requests.request but adding the base_url
250 method (str): GET, POST, etc.
251 url (str): url for the request, base_url will be prepended
252 message (str): message to show when starting the request and if it fails
254 All other arguments will be forwarded to requests.request.
256 if message
is not None:
261 except requests.exceptions.ConnectionError
as e:
262 B2FATAL(
"Could not access '" + self.
_base_url + url.lstrip(
"/") +
"': " + str(e))
264 if req.status_code >= 300:
268 response = req.json()
269 message = response.get(
"message",
"")
270 colon =
": " if message.strip()
else ""
271 error =
"Request {method} {url} returned {code} {reason}{colon}{message}".format(
272 method=method, url=url,
273 code=response[
"code"],
274 reason=response[
"reason"],
278 except json.JSONDecodeError:
280 error =
"Request {method} {url} returned non JSON response {code}: {content}".format(
281 method=method, url=url,
282 code=req.status_code,
286 if message
is not None:
291 if method !=
"HEAD" and req.status_code != requests.codes.no_content:
294 except json.JSONDecodeError
as e:
295 B2INFO(
"Invalid response: {}".format(req.content))
297 .format(e, method=method, url=url))
301 """Get a list of all globaltags. Returns a dictionary with the globaltag
302 names and the corresponding ids in the database"""
305 req = self.
request(
"GET",
"/globalTags")
307 B2ERROR(
"Could not get the list of globaltags: {}".format(e))
311 for tag
in req.json():
312 result[tag[
"name"]] = tag
317 """Check whether the globaltag with the given name exists."""
320 self.
request(
"GET",
"/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
327 """Get the id of the globaltag with the given name. Returns either the
328 id or None if the tag was not found"""
331 req = self.
request(
"GET",
"/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
333 B2ERROR(
"Cannot find globaltag '{}': {}".format(name, e))
340 Get the dictionary describing the given globaltag type (currently
341 one of DEV or RELEASE). Returns None if tag type was not found.
344 req = self.
request(
"GET",
"/globalTagType")
346 B2ERROR(
"Could not get list of valid globaltag types: {}".format(e))
349 types = {e[
"name"]: e
for e
in req.json()}
354 B2ERROR(
"Unknown globaltag type: '{}', please use one of {}".format(name,
", ".join(types)))
359 Create a new globaltag
361 info = {
"name": name,
"description": description,
"modifiedBy": user,
"isDefault":
False}
363 req = self.
request(
"POST", f
"/globalTag/DEV", f
"Creating globaltag {name}", json=info)
365 B2ERROR(f
"Could not create globaltag {name}: {e}")
372 Return list of all payloads in the given globaltag where each element is
373 a `PayloadInformation` instance
376 gobalTag (str): name of the globaltag
377 exp (int): if given limit the list of payloads to the ones valid for
378 the given exp,run combination
379 run (int): if given limit the list of payloads to the ones valid for
380 the given exp,run combination
381 message (str): additional message to show when downloading the
382 payload information. Will be directly appended to
383 "Obtaining lists of iovs for globaltag {globalTag}"
386 Both, exp and run, need to be given at the same time. Just supplying
387 an experiment or a run number will not work
389 globalTag = encode_name(globalTag)
393 msg = f
"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
394 req = self.
request(
"GET",
"/iovPayloads", msg, params={
'gtName': globalTag,
'expNumber': exp,
'runNumber': run})
396 msg = f
"Obtaining list of iovs for globaltag {globalTag}{message}"
397 req = self.
request(
"GET", f
"/globalTag/{globalTag}/globalTagPayloads", msg)
399 for item
in req.json():
400 payload = item[
"payload" if 'payload' in item
else "payloadId"]
401 if "payloadIov" in item:
402 iovs = [item[
'payloadIov']]
404 iovs = item[
'payloadIovs']
407 all_iovs.append(PayloadInformation.from_json(payload, iov))
414 Get a list of all defined payloads (for the given global_tag or by default for all).
415 Returns a dictionary which maps (module, checksum) to the payload id.
420 req = self.
request(
"GET",
"/globalTag/{global_tag}/payloads"
421 .format(global_tag=encode_name(global_tag)))
423 req = self.
request(
"GET",
"/payloads")
425 B2ERROR(
"Cannot get list of payloads: {}".format(e))
429 for payload
in req.json():
430 module = payload[
"basf2Module"][
"name"]
431 checksum = payload[
"checksum"]
432 result[(module, checksum)] = payload[
"payloadId"]
438 Check for the existence of payloads in the database.
441 payloads (list((str,str))): A list of payloads to check for. Each
442 payload needs to be a tuple of the name of the payload and the
443 md5 checksum of the payload file.
444 information (str): The information to be extracted from the
448 A dictionary with the payload identifiers (name, checksum) as keys
449 and the requested information as values for all payloads which are already
450 present in the database.
453 search_query = [{
"name": e[0],
"checksum": e[1]}
for e
in payloads]
455 req = self.
request(
"POST",
"/checkPayloads", json=search_query)
457 B2ERROR(
"Cannot check for existing payloads: {}".format(e))
461 for payload
in req.json():
462 module = payload[
"basf2Module"][
"name"]
463 checksum = payload[
"checksum"]
464 result[(module, checksum)] = payload[information]
470 Get the revision numbers of payloads in the database.
473 entries (list): A list of payload entries.
474 Each entry must have the attributes module and checksum.
480 result = self.
check_payloads([(entry.module, entry.checksum)
for entry
in entries],
"revision")
484 for entry
in entries:
485 entry.revision = result.get((entry.module, entry.checksum), 0)
494 module (str): name of the module
495 filename (str): name of the file
496 checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
499 checksum = file_checksum(filename)
505 (filename, open(filename,
"rb").read(),
"application/x-root"),
506 (
"json", json.dumps({
"checksum": checksum,
"isDefault":
False}),
"application/json"),
511 for name, contents, mimetype
in files:
512 rf = RequestField(name=name, data=contents)
513 rf.make_multipart(content_type=mimetype)
516 post_body, content_type = encode_multipart_formdata(fields)
517 content_type =
''.join((
'multipart/mixed',) + content_type.partition(
';')[1:])
518 headers = {
'Content-Type': content_type}
523 req = self.
request(
"POST",
"/package/dbstore/module/{moduleName}/payload"
524 .format(moduleName=encode_name(module)),
525 data=post_body, headers=headers)
527 B2ERROR(
"Could not create Payload: {}".format(e))
530 return req.json()[
"payloadId"]
532 def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
537 globalTagId (int): id of the globaltag, obtain with get_globalTagId()
538 payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
539 firstExp (int): first experiment for which this iov is valid
540 firstRun (int): first run for which this iov is valid
541 finalExp (int): final experiment for which this iov is valid
542 finalRun (int): final run for which this iov is valid
545 payloadIovId of the created iov, None if creation was not successful
550 local_variables = locals()
551 variables = {e: int(local_variables[e])
for e
in
552 [
"globalTagId",
"payloadId",
"firstExp",
"firstRun",
"finalExp",
"finalRun"]}
554 B2ERROR(
"create_iov: All parameters need to be integers")
559 req = self.
request(
"POST",
"/globalTagPayload/{globalTagId},{payloadId}"
560 "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
562 B2ERROR(
"Could not create IOV: {}".format(e))
565 return req.json()[
"payloadIovId"]
568 """Return existing iovs for a given tag name. It returns a dictionary
569 which maps (payloadId, first runId, final runId) to iovId"""
572 req = self.
request(
"GET",
"/globalTag/{globalTagName}/globalTagPayloads"
573 .format(globalTagName=encode_name(globalTagName)))
579 for payload
in req.json():
580 payloadId = payload[
"payloadId"][
"payloadId"]
581 for iov
in payload[
"payloadIovs"]:
582 iovId = iov[
"payloadIovId"]
583 firstExp, firstRun = iov[
"expStart"], iov[
"runStart"]
584 finalExp, finalRun = iov[
"expEnd"], iov[
"runEnd"]
585 result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
589 def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
591 Upload a testing payload storage to the conditions database.
594 filename (str): filename of the testing payload storage file that should be uploaded
595 global_tage (str): name of the globaltag to which the data should be uploaded
596 normalize (bool/str): if True the payload root files will be normalized to have the same checksum for the same content,
597 if normalize is a string in addition the file name in the root file metadata will be set to it
598 ignore_existing (bool): if True do not upload payloads that already exist
599 nprocess (int): maximal number of parallel uploads
600 uploaded_entries (list): the list of successfully uploaded entries
603 True if the upload was successful
608 B2INFO(f
"Reading payload list from {filename}")
609 entries = parse_testing_payloads_file(filename)
611 B2ERROR(f
"Problems with testing payload storage file {filename}, exiting")
615 B2INFO(f
"No payloads found in {filename}, exiting")
618 B2INFO(f
"Found {len(entries)} iovs to upload")
624 tagId = tagId[
"globalTagId"]
628 entries = sorted(set(reversed(entries)))
631 name = normalize
if normalize
is not True else None
633 e.normalize(name=name)
637 payloads = defaultdict(list)
639 payloads[(e.module, e.checksum)].append(e)
641 existing_payloads = {}
644 def upload_payload(item):
645 """Upload a payload file if necessary but first check list of existing payloads"""
647 if key
in existing_payloads:
648 B2INFO(f
"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
649 payload_id = existing_payloads[key]
652 payload_id = self.
create_payload(entry.module, entry.filename, entry.checksum)
653 if payload_id
is None:
656 B2INFO(f
"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
658 for entry
in entries:
659 entry.payload = payload_id
664 """Create an iov if necessary but first check the list of existing iovs"""
665 if entry.payload
is None:
668 iov_key = (entry.payload,) + entry.iov_tuple
669 if iov_key
in existing_iovs:
670 entry.iov = existing_iovs[iov_key]
671 B2INFO(f
"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
673 entry.payloadIovId = self.
create_iov(tagId, entry.payload, *entry.iov_tuple)
674 if entry.payloadIovId
is None:
677 B2INFO(f
"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
682 with ThreadPoolExecutor(max_workers=nprocess)
as pool:
685 if not ignore_existing:
686 B2INFO(
"Downloading information about existing payloads and iovs...")
689 existing_payloads = {}
691 def create_future(iter, func, callback=None):
692 fn = pool.submit(iter, func)
693 if callback
is not None:
694 fn.add_done_callback(callback)
697 def update_iovs(iovs):
698 existing_iovs.update(iovs.result())
699 B2INFO(f
"Found {len(existing_iovs)} existing iovs in {global_tag}")
701 def update_payloads(payloads):
702 existing_payloads.update(payloads.result())
703 B2INFO(f
"Found {len(existing_payloads)} existing payloads")
705 create_future(self.
get_iovs, global_tag, update_iovs)
707 for chunk
in chunks(payloads.keys(), 1000):
710 futures_wait(futures)
713 failed_payloads = sum(0
if result
else 1
for result
in pool.map(upload_payload, payloads.items()))
714 if failed_payloads > 0:
715 B2ERROR(f
"{failed_payloads} payloads could not be uploaded")
719 for entry
in pool.map(create_iov, entries):
721 if uploaded_entries
is not None:
722 uploaded_entries.append(entry)
726 B2ERROR(f
"{failed_iovs} IoVs could not be created")
729 if uploaded_entries
is not None:
732 return failed_payloads + failed_iovs == 0
736 Upload a testing payload storage to a staging globaltag and create or update a jira issue
739 filename (str): filename of the testing payload storage file that should be uploaded
740 normalize (bool/str): if True the payload root files will be
741 normalized to have the same checksum for the same content, if
742 normalize is a string in addition the file name in the root file
743 metadata will be set to it
744 data (dict): a dictionary with the information provided by the user:
746 * task: category of globaltag, either master, online, prompt, data, mc, or analysis
747 * tag: the globaltage name
748 * request: type of request, either Update, New, or Modification. The latter two imply task == master because
749 if new payload classes are introduced or payload classes are modified then they will first be included in
750 the master globaltag. Here a synchronization of code and payload changes has to be managed.
751 If new or modified payload classes should be included in other globaltags they must already be in a release.
752 * pull-request: number of the pull request containing new or modified payload classes,
753 only for request == New or Modified
754 * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
755 only for request == Modified
756 * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
757 only for request == Modified
758 * release: the required release version
759 * reason: the reason for the request
760 * description: a detailed description for the globaltag manager
761 * issue: identifier of an existing jira issue (optional)
762 * user: name of the user
763 * time: time stamp of the request
765 password: the password for access to jira or the access token and secret for oauth access
768 True if the upload and jira issue creation/upload was successful
772 data[
'tag'] = upload_global_tag(data[
'task'])
773 if data[
'tag']
is None:
774 data[
'tag'] = f
"staging_{data['task']}_{data['user']}_{data['time']}"
782 B2INFO(f
"Uploading testing database {filename} to globaltag {data['tag']}")
784 if not self.
upload(filename, data[
'tag'], normalize, uploaded_entries=entries):
789 issue = data[
'issue']
791 issue = jira_global_tag_v2(data[
'task'])
793 issue = {
"components": [{
"name":
"globaltag"}]}
796 if type(issue)
is tuple:
797 description = issue[1].format(**data)
801 |*Upload globaltag* | {data['tag']} |
802 |*Request reason* | {data['reason']} |
803 |*Required release* | {data['release']} |
804 |*Type of request* | {data['request']} |
806 if 'pull-request' in data.keys():
807 description += f
"|*Pull request* | \\#{data['pull-request']} |\n"
808 if 'backward-compatibility' in data.keys():
809 description += f
"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
810 if 'forward-compatibility' in data.keys():
811 description += f
"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
812 description +=
'|*Details* |' +
''.join(data[
'details']) +
' |\n'
813 if data[
'task'] ==
'online':
814 description += f
'|*Impact on data taking*|' +
''.join(data[
'data_taking']) +
' |\n'
817 description +=
'\nPayloads\n||Name||Revision||IoV||\n'
818 for entry
in entries:
819 description += f
"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
822 if type(issue)
is dict:
823 issue[
"description"] = description
824 if "summary" in issue.keys():
825 issue[
"summary"] = issue[
"summary"].format(**data)
827 issue[
"summary"] = f
"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
828 if "project" not in issue.keys():
829 issue[
"project"] = {
"key":
"BII"}
830 if "issuetype" not in issue.keys():
831 issue[
"issuetype"] = {
"name":
"Task"}
833 B2INFO(f
"Creating jira issue for {data['task']} globaltag request")
834 if isinstance(password, str):
835 response = requests.post(
'https://agira.desy.de/rest/api/latest/issue', auth=(data[
'user'], password),
836 json={
'fields': issue})
838 fields = {
'issue': json.dumps(issue)}
839 if 'user' in data.keys():
840 fields[
'user'] = data[
'user']
842 fields[
'token'] = password[0]
843 fields[
'secret'] = password[1]
844 response = requests.post(
'https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
845 if response.status_code
in range(200, 210):
846 B2INFO(f
"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
848 B2ERROR(
'The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
855 new_issue_config = jira_global_tag_v2(data[
'task'])
856 if isinstance(new_issue_config, dict)
and "assignee" in new_issue_config:
857 user = new_issue_config[
'assignee'].get(
'name',
None)
858 if user
is not None and isinstance(password, str):
859 response = requests.post(f
'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
860 auth=(data[
'user'], password), json=user)
861 if response.status_code
in range(200, 210):
862 B2INFO(f
"Added {user} as watcher to {issue}")
864 B2WARNING(f
"Could not add {user} as watcher to {issue}: {response.status_code}")
866 B2INFO(f
"Commenting on jira issue {issue} for {data['task']} globaltag request")
867 if isinstance(password, str):
868 response = requests.post(
'https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
869 auth=(data[
'user'], password), json={
'body': description})
871 fields = {
'id': issue,
'user': user,
'comment': description}
873 fields[
'token'] = password[0]
874 fields[
'secret'] = password[1]
875 response = requests.post(
'https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
876 if response.status_code
in range(200, 210):
877 B2INFO(f
"Issue successfully updated: https://agira.desy.de/browse/{issue}")
879 B2ERROR(
'The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
885 def require_database_for_test(timeout=60, base_url=None):
886 """Make sure that the database is available and skip the test if not.
888 This function should be called in test scripts if they are expected to fail
889 if the database is down. It either returns when the database is ok or it
890 will signal test_basf2 that the test should be skipped and exit
893 if os.environ.get(
"BELLE2_CONDB_GLOBALTAG",
None) ==
"":
894 raise Exception(
"Access to the Database is disabled")
895 base_url_list = ConditionsDB.get_base_urls(base_url)
896 for url
in base_url_list:
898 req = requests.request(
"HEAD", url.rstrip(
'/') +
"/v2/globalTags")
899 req.raise_for_status()
900 except requests.RequestException
as e:
901 B2WARNING(f
"Problem connecting to {url}:\n {e}\n Trying next server ...")
905 print(
"TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
909 def enable_debugging():
910 """Enable verbose output of python-requests to be able to debug http connections"""
914 import http.client
as http_client
916 http_client.HTTPConnection.debuglevel = 1
918 logging.basicConfig()
919 logging.getLogger().setLevel(logging.DEBUG)
920 requests_log = logging.getLogger(
"requests.packages.urllib3")
921 requests_log.setLevel(logging.DEBUG)
922 requests_log.propagate =
True