273 def __init__(self, base_url=None, max_connections=10, retries=3):
275 Create a new instance of the interface
278 base_url (string): base url of the rest interface
279 max_connections (int): number of connections to keep open, mostly useful for threaded applications
280 retries (int): number of retries in case of connection problems
283 ## session object to get keep-alive support and connection pooling
284 self._session = requests.Session()
285 # and set the connection options we want to have
286 adapter = requests.adapters.HTTPAdapter(
287 pool_connections=max_connections, pool_maxsize=max_connections,
288 max_retries=retries, pool_block=True
290 self._session.mount("http://", adapter)
291 self._session.mount("https://", adapter)
292 # and also set the proxy settings
293 if "BELLE2_CONDB_PROXY" in os.environ:
294 self._session.proxies = {
295 "http": os.environ.get("BELLE2_CONDB_PROXY"),
296 "https": os.environ.get("BELLE2_CONDB_PROXY"),
298 # test the given url or try the known defaults
299 base_url_list = ConditionsDB.get_base_urls(base_url)
301 for url in base_url_list:
302 ## base url to be prepended to all requests
303 self._base_url = url.rstrip("/") + "/"
305 req = self._session.request("HEAD", self._base_url + "v2/globalTagStatus")
306 req.raise_for_status()
307 except requests.RequestException as e:
308 B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
312 B2FATAL("No working database servers configured, giving up")
314 # We have a working server so change the api to return json instead of
315 # xml, much easier in python, also request non-cached replies. We do
316 # this now because for the server check above we're fine with cached
318 self._session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
330 def request(self, method, url, message=None, *args, **argk):
332 Request function, similar to requests.request but adding the base_url
335 method (str): GET, POST, etc.
336 url (str): url for the request, base_url will be prepended
337 message (str): message to show when starting the request and if it fails
339 All other arguments will be forwarded to requests.request.
341 if message is not None:
345 req = self._session.request(method, self._base_url + "v2/" + url.lstrip("/"), *args, **argk)
346 except requests.exceptions.ConnectionError as e:
347 B2FATAL("Could not access '" + self._base_url + url.lstrip("/") + "': " + str(e))
349 if req.status_code >= 300:
350 # Apparently something is not good. Let's try to decode the json
351 # reply containing reason and message
353 response = req.json()
354 message = response.get("message", "")
355 colon = ": " if message.strip() else ""
356 error = f"Request {method} {url} returned {response['code']} {response['reason']}{colon}{message}"
357 except json.JSONDecodeError:
358 # seems the reply was not even json
359 error = f"Request {method} {url} returned non JSON response {req.status_code}: {req.content}"
361 if message is not None:
362 raise ConditionsDB.RequestError(f"{message} failed: {error}")
364 raise ConditionsDB.RequestError(error)
366 if method != "HEAD" and req.status_code != requests.codes.no_content:
369 except json.JSONDecodeError as e:
370 B2INFO(f"Invalid response: {req.content}")
371 raise ConditionsDB.RequestError(f"{method} {url} returned invalid JSON response {e}")
444 def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
446 Return list of all payloads in the given globaltag where each element is
447 a `PayloadInformation` instance
450 gobalTag (str): name of the globaltag
451 exp (int): if given limit the list of payloads to the ones valid for
452 the given exp,run combination
453 run (int): if given limit the list of payloads to the ones valid for
454 the given exp,run combination
455 message (str): additional message to show when downloading the
456 payload information. Will be directly appended to
457 "Obtaining lists of iovs for globaltag {globalTag}"
458 run_range (tuple): if given limit the list of payloads to the ones
459 overlapping with the given run range, if
460 fully_contained (bool): if True and the run_range is not None it limits
461 the list of payloads to the ones fully contained in the given run range
464 Both, exp and run, need to be given at the same time. Just supplying
465 an experiment or a run number will not work
467 globalTag = encode_name(globalTag)
470 if run_range is not None:
472 message += f" [fully contained in {tuple(run_range)}]"
474 message += f" [valid in {tuple(run_range)}]"
475 run_range = IntervalOfValidity(run_range)
478 msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
479 req = self.request("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
481 msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
482 req = self.request("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
484 for item in req.json():
485 payload = item["payload" if 'payload' in item else "payloadId"]
486 if "payloadIov" in item:
487 iovs = [item['payloadIov']]
489 iovs = item['payloadIovs']
492 if run_range is not None:
493 iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
495 if not iov_ & run_range == iov_:
498 if iov_ & run_range is None:
500 all_iovs.append(PayloadInformation.from_json(payload, iov))
581 def create_payload(self, module, filename, checksum=None):
586 module (str): name of the module
587 filename (str): name of the file
588 checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
591 checksum = file_checksum(filename)
593 # this is the only complicated request as we have to provide a
594 # multipart/mixed request which is not directly provided by the request
597 (filename, open(filename, "rb").read(), "application/x-root"),
598 ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
600 # ok we have the two "files" we want to send, create multipart/mixed
603 for name, contents, mimetype in files:
604 rf = RequestField(name=name, data=contents)
605 rf.make_multipart(content_type=mimetype)
608 post_body, content_type = encode_multipart_formdata(fields)
609 content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
610 headers = {'Content-Type': content_type}
612 # now make the request. Note to self: if multipart/form-data would be
613 # accepted this would be so much nicer here. but it works.
615 req = self.request("POST", f"/package/dbstore/module/{encode_name(module)}/payload", data=post_body, headers=headers)
616 except ConditionsDB.RequestError as e:
617 B2ERROR(f"Could not create Payload: {e}")
620 return req.json()["payloadId"]
622 def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
627 globalTagId (int): id of the globaltag, obtain with get_globalTagId()
628 payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
629 firstExp (int): first experiment for which this iov is valid
630 firstRun (int): first run for which this iov is valid
631 finalExp (int): final experiment for which this iov is valid
632 finalRun (int): final run for which this iov is valid
635 payloadIovId of the created iov, None if creation was not successful
638 # try to convert all arguments except self to integers to make sure they are
640 local_variables = locals()
641 variables = {e: int(local_variables[e]) for e in
642 ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
644 B2ERROR("create_iov: All parameters need to be integers")
647 # try to create the iov
649 req = self.request("POST", "/globalTagPayload/{globalTagId},{payloadId}"
650 "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
651 except ConditionsDB.RequestError as e:
652 B2ERROR(f"Could not create IOV: {e}")
655 return req.json()["payloadIovId"]
684 def get_iovs(self, globalTagName, payloadName=None):
685 """Return existing iovs for a given tag name. It returns a dictionary
686 which maps (payloadId, first runId, final runId) to iovId
689 globalTagName(str): Global tag name.
690 payloadName(str): Payload name (if None, selection by name is
695 req = self.request("GET", f"/globalTag/{encode_name(globalTagName)}/globalTagPayloads")
696 except ConditionsDB.RequestError:
697 # there could be just no iovs so no error
701 for payload in req.json():
702 payloadId = payload["payloadId"]["payloadId"]
703 if payloadName is not None:
704 if payload["payloadId"]["basf2Module"]["name"] != payloadName:
706 for iov in payload["payloadIovs"]:
707 iovId = iov["payloadIovId"]
708 firstExp, firstRun = iov["expStart"], iov["runStart"]
709 finalExp, finalRun = iov["expEnd"], iov["runEnd"]
710 result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
714 def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
716 Upload a testing payload storage to the conditions database.
719 filename (str): filename of the testing payload storage file that should be uploaded
720 global_tage (str): name of the globaltag to which the data should be uploaded
721 normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
722 same content, if normalize is a string in addition the file name in the root file metadata will be set to it
723 ignore_existing (bool): if True do not upload payloads that already exist
724 nprocess (int): maximal number of parallel uploads
725 uploaded_entries (list): the list of successfully uploaded entries
728 True if the upload was successful
731 # first create a list of payloads
732 from conditions_db.testing_payloads import parse_testing_payloads_file
733 B2INFO(f"Reading payload list from {filename}")
734 entries = parse_testing_payloads_file(filename)
736 B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
740 B2INFO(f"No payloads found in {filename}, exiting")
743 B2INFO(f"Found {len(entries)} iovs to upload")
745 # time to get the id for the globaltag
746 tagId = self.get_globalTagInfo(global_tag)
749 tagId = tagId["globalTagId"]
751 # now we could have more than one payload with the same iov so let's go over
752 # it again and remove duplicates but keep the last one for each
753 entries = sorted(set(reversed(entries)))
756 name = normalize if normalize is not True else None
758 e.normalize(name=name)
760 # so let's have a list of all payloads (name, checksum) as some payloads
761 # might have multiple iovs. Each payload gets a list of all of those
762 payloads = defaultdict(list)
764 payloads[(e.module, e.checksum)].append(e)
766 existing_payloads = {}
769 def upload_payload(item):
770 """Upload a payload file if necessary but first check list of existing payloads"""
772 if key in existing_payloads:
773 B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
774 payload_id = existing_payloads[key]
777 payload_id = self.create_payload(entry.module, entry.filename, entry.checksum)
778 if payload_id is None:
781 B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
783 for entry in entries:
784 entry.payload = payload_id
788 def create_iov(entry):
789 """Create an iov if necessary but first check the list of existing iovs"""
790 if entry.payload is None:
793 iov_key = (entry.payload,) + entry.iov_tuple
794 if iov_key in existing_iovs:
795 entry.iov = existing_iovs[iov_key]
796 B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
798 entry.payloadIovId = self.create_iov(tagId, entry.payload, *entry.iov_tuple)
799 if entry.payloadIovId is None:
802 B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
806 # multithreading for the win ...
807 with ThreadPoolExecutor(max_workers=nprocess) as pool:
808 # if we want to check for existing payloads/iovs we schedule the download of
809 # the full payload list. And write a message as each completes
810 if not ignore_existing:
811 B2INFO("Downloading information about existing payloads and iovs...")
814 existing_payloads = {}
816 def create_future(iter, func, callback=None):
817 fn = pool.submit(iter, func)
818 if callback is not None:
819 fn.add_done_callback(callback)
822 def update_iovs(iovs):
823 existing_iovs.update(iovs.result())
824 B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
826 def update_payloads(payloads):
827 existing_payloads.update(payloads.result())
828 B2INFO(f"Found {len(existing_payloads)} existing payloads")
830 create_future(self.get_iovs, global_tag, update_iovs)
831 # checking existing payloads should not be done with too many at once
832 for chunk in chunks(payloads.keys(), 1000):
833 create_future(self.check_payloads, chunk, update_payloads)
835 futures_wait(futures)
838 failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
839 if failed_payloads > 0:
840 B2ERROR(f"{failed_payloads} payloads could not be uploaded")
844 for entry in pool.map(create_iov, entries):
846 if uploaded_entries is not None:
847 uploaded_entries.append(entry)
851 B2ERROR(f"{failed_iovs} IoVs could not be created")
853 # update revision numbers
854 if uploaded_entries is not None:
855 self.get_revisions(uploaded_entries)
857 return failed_payloads + failed_iovs == 0
859 def staging_request(self, filename, normalize, data, password):
861 Upload a testing payload storage to a staging globaltag and create or update a jira issue
864 filename (str): filename of the testing payload storage file that should be uploaded
865 normalize (Union[bool, str]): if True the payload root files will be
866 normalized to have the same checksum for the same content, if
867 normalize is a string in addition the file name in the root file
868 metadata will be set to it
869 data (dict): a dictionary with the information provided by the user:
871 * task: category of globaltag, either main, online, prompt, data, mc, or analysis
872 * tag: the globaltag name
873 * request: type of request, either Update, New, or Modification. The latter two imply task == main because
874 if new payload classes are introduced or payload classes are modified then they will first be included in
875 the main globaltag. Here a synchronization of code and payload changes has to be managed.
876 If new or modified payload classes should be included in other globaltags they must already be in a release.
877 * pull-request: number of the pull request containing new or modified payload classes,
878 only for request == New or Modified
879 * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
880 only for request == Modified
881 * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
882 only for request == Modified
883 * release: the required release version
884 * reason: the reason for the request
885 * description: a detailed description for the globaltag manager
886 * issue: identifier of an existing jira issue (optional)
887 * user: name of the user
888 * time: time stamp of the request
890 password: the password for access to jira or the access token and secret for oauth access
893 True if the upload and jira issue creation/upload was successful
896 # determine the staging globaltag name
897 data['tag'] = upload_global_tag(data['task'])
898 if data['tag'] is None:
899 data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
901 # create the staging globaltag if it does not exists yet
902 if not self.has_globalTag(data['tag']):
903 if not self.create_globalTag(data['tag'], data['reason'], data['user']):
906 # upload the payloads
907 B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
909 if not self.upload(filename, data['tag'], normalize, uploaded_entries=entries):
912 # get the dictionary for the jira issue creation/update
914 issue = data['issue']
916 issue = jira_global_tag_v2(data['task'])
918 issue = {"components": [{"name": "globaltag"}]}
920 # create jira issue text from provided information
921 if type(issue) is tuple:
922 description = issue[1].format(**data)
926|*Upload globaltag* | {data['tag']} |
927|*Request reason* | {data['reason']} |
928|*Required release* | {data['release']} |
929|*Type of request* | {data['request']} |
931 if 'pull-request' in data.keys():
932 description += f"|*Pull request* | \\#{data['pull-request']} |\n"
933 if 'backward-compatibility' in data.keys():
934 description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
935 if 'forward-compatibility' in data.keys():
936 description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
937 description += '|*Details* |' + ''.join(data['details']) + ' |\n'
938 if data['task'] == 'online':
939 description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
941 # add information about uploaded payloads/IoVs
942 description += '\nPayloads\n||Name||Revision||IoV||\n'
943 for entry in entries:
944 description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
947 if type(issue) is dict:
948 issue["description"] = description
949 if "summary" in issue.keys():
950 issue["summary"] = issue["summary"].format(**data)
952 issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
953 if "project" not in issue.keys():
954 issue["project"] = {"key": "BII"}
955 if "issuetype" not in issue.keys():
956 issue["issuetype"] = {"name": "Task"}
957 if data["task"] == "main":
958 issue["labels"] = ["TUPPR"]
960 B2INFO(f"Creating jira issue for {data['task']} globaltag request")
961 if isinstance(password, str):
962 response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
963 json={'fields': issue})
965 fields = {'issue': json.dumps(issue)}
966 if 'user' in data.keys():
967 fields['user'] = data['user']
969 fields['token'] = password[0]
970 fields['secret'] = password[1]
971 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
972 if response.status_code in range(200, 210):
973 B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
975 B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
978 # comment on an existing issue
980 # Let's make sure all assignees of new issues are added as watchers
981 # in that case, otherwise they might never find out
982 new_issue_config = jira_global_tag_v2(data['task'])
983 if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
984 user = new_issue_config['assignee'].get('name', None)
985 if user is not None and isinstance(password, str):
986 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
987 auth=(data['user'], password), json=user)
988 if response.status_code in range(200, 210):
989 B2INFO(f"Added {user} as watcher to {issue}")
991 B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
993 B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
994 if isinstance(password, str):
995 response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/comment',
996 auth=(data['user'], password), json={'body': description})
998 fields = {'id': issue, 'user': user, 'comment': description}
1000 fields['token'] = password[0]
1001 fields['secret'] = password[1]
1002 response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1003 if response.status_code in range(200, 210):
1004 B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1006 B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])