Belle II Software  release-08-01-10
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 import stat
20 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING, conditions
21 import requests
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 from typing import Union # noqa
32 import getpass
33 import tempfile
34 from conditions_db.iov import IntervalOfValidity
35 
36 
37 def encode_name(name):
38  """Escape name to be used in an url"""
39  return urllib.parse.quote(name, safe="")
40 
41 
42 def file_checksum(filename):
43  """Calculate md5 hash of file"""
44  md5hash = hashlib.md5()
45  with open(filename, "rb") as data:
46  md5hash.update(data.read())
47  return md5hash.hexdigest()
48 
49 
50 def chunks(container, chunk_size):
51  """Cut a container in chunks of max. chunk_size"""
52  it = iter(container)
53  while True:
54  chunk = tuple(itertools.islice(it, chunk_size))
55  if not chunk:
56  return
57  yield chunk
58 
59 
60 def get_cdb_authentication_token(path=None):
61  """
62  Helper function for correctly retrieving the CDB authentication token (either via file or via issuing server).
63 
64  :param path: Path to a file containing a CDB authentication token; if None, the function will use
65  a default path (``${HOME}/b2cdb_${BELLE2_USER}.token`` or ``/tmp/b2cdb_${BELLE2_USER}.token``) to look for a token.
66  """
67  # if we pass a path, let's use it for getting the token, otherwise use the default one
68  if path:
69  path_to_token = path
70  else:
71  path_to_token = os.path.join(os.getenv('HOME', tempfile.gettempdir()), f'b2cdb_{os.getenv("BELLE2_USER", None)}.token')
72 
73  # check validity of existing token
74  if os.path.isfile(path_to_token):
75  with open(path_to_token) as token_file:
76  response = requests.get('https://token.belle2.org/check', verify=False, params={'token': token_file.read().strip()})
77  if response.status_code == 400:
78  B2INFO(f'The file {path_to_token} contains an invalid token, getting a new token...')
79  os.unlink(path_to_token)
80  elif response.status_code > 400:
81  B2WARNING("The validity of the existing token could not be checked. Trying to connect to CDB anyway.")
82 
83  # request a token if there is none
84  if not os.path.isfile(path_to_token):
85  username = os.environ['BELLE2_USER']
86  othername = input(f'\nConfirm your DESY user name by pressing enter or type the correct one [{username}]: ')
87  if othername != '':
88  username = othername
89  password = getpass.getpass("Please type your DESY password to request a CDB token: ")
90  response = requests.get('https://token.belle2.org', verify=False, auth=(username, password))
91  print(response.content)
92  if response.status_code == requests.codes.ok:
93  with open(path_to_token, 'wb') as token_file:
94  os.chmod(path_to_token, stat.S_IRUSR | stat.S_IWUSR)
95  token_file.write(response.content)
96  else:
97  B2ERROR('Failed to get token')
98  return None
99 
100  # read and return token
101  with open(path_to_token) as token_file:
102  return token_file.read().strip()
103 
104 
105 def set_cdb_authentication_token(cdb_instance, auth_token=None):
106  """
107  Helper function for setting the CDB authentication token.
108 
109  :param cdb_instance: An instance of the ``ConditionsDB`` class.
110  :param auth_token: A CDB authentication token: if ``None``, it is automatically retrieved from the issuing server
111  and then set
112  """
113  if auth_token is not None:
114  cdb_instance.set_authentication_token(auth_token)
115  else:
116  # If something goes wrong with the auth. token, the function returns None and the authentication will fail
117  token = get_cdb_authentication_token(os.getenv('BELLE2_CDB_AUTH_TOKEN', default=None))
118  cdb_instance.set_authentication_token(token)
119 
120 
121 class BearerAuth(requests.auth.AuthBase):
122  """Simple class to present bearer token instead of username/password"""
123 
124  def __init__(self, token):
125  """Construct from a token"""
126 
127  self._authtoken_authtoken = f"Bearer {token}"
128 
129  def __call__(self, r):
130  """Update headers to include token"""
131  r.headers["X-Authorization"] = self._authtoken_authtoken
132  return r
133 
134 
136  """Small container class to help compare payload information for efficient
137  comparison between globaltags"""
138 
139  @classmethod
140  def from_json(cls, payload, iov=None):
141  """Set all internal members from the json information of the payload and the iov.
142 
143  Arguments:
144  payload (dict): json information of the payload as returned by REST api
145  iov (dict): json information of the iov as returned by REST api
146  """
147  if iov is None:
148  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
149 
150  return cls(
151  payload['payloadId'],
152  payload['basf2Module']['name'],
153  payload['revision'],
154  payload['checksum'],
155  payload['payloadUrl'],
156  payload['baseUrl'],
157  iov['payloadIovId'],
158  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
159  )
160 
161  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
162  """
163  Create a new object from the given information
164  """
165 
166  self.namename = name
167 
168  self.checksumchecksum = checksum
169 
170  self.ioviov = iov
171 
172  self.revisionrevision = revision
173 
174  self.payload_idpayload_id = payload_id
175 
176  self.iov_idiov_id = iov_id
177 
178  self.base_urlbase_url = base_url
179 
180  self.payload_urlpayload_url = payload_url
181 
182  @property
183  def url(self):
184  """Return the full url to the payload on the server"""
185  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
186 
187  def __hash__(self):
188  """Make object hashable"""
189  return hash((self.namename, self.checksumchecksum, self.ioviov))
190 
191  def __eq__(self, other):
192  """Check if two payloads are equal"""
193  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
194 
195  def __lt__(self, other):
196  """Sort payloads by name, iov, revision"""
197  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
198 
199  def readable_iov(self):
200  """return a human readable name for the IoV"""
201  if self.ioviov is None:
202  return "none"
203 
204  if self.ioviov == (0, 0, -1, -1):
205  return "always"
206 
207  e1, r1, e2, r2 = self.ioviov
208  if e1 == e2:
209  if r1 == 0 and r2 == -1:
210  return f"exp {e1}"
211  elif r2 == -1:
212  return f"exp {e1}, runs {r1}+"
213  elif r1 == r2:
214  return f"exp {e1}, run {r1}"
215  else:
216  return f"exp {e1}, runs {r1} - {r2}"
217  else:
218  if e2 == -1 and r1 == 0:
219  return f"exp {e1} - forever"
220  elif e2 == -1:
221  return f"exp {e1}, run {r1} - forever"
222  elif r1 == 0 and r2 == -1:
223  return f"exp {e1}-{e2}, all runs"
224  elif r2 == -1:
225  return f"exp {e1}, run {r1} - exp {e2}, all runs"
226  else:
227  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
228 
229 
231  """Class to interface conditions db REST interface"""
232 
233 
234  BASE_URLS = [conditions.default_metadata_provider_url]
235 
236  class RequestError(RuntimeError):
237  """Class to be thrown by request() if there is any error"""
238  pass
239 
240  @staticmethod
241  def get_base_urls(given_url):
242  """Resolve the list of server urls. If a url is given just return it.
243  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
244  builtin defaults
245 
246  Arguments:
247  given_url (str): Explicit base_url. If this is not None it will be
248  returned as is in a list of length 1
249 
250  Returns:
251  a list of urls to try for database connectivity
252  """
253 
254  base_url_list = ConditionsDB.BASE_URLS[:]
255  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
256  if given_url is not None:
257  base_url_list = [given_url]
258  elif base_url_env is not None:
259  base_url_list = base_url_env.split()
260  B2INFO("Getting Conditions Database servers from Environment:")
261  for i, url in enumerate(base_url_list, 1):
262  B2INFO(f" {i}. {url}")
263  # try to escalate to https for all given urls
264  full_list = []
265  for url in base_url_list:
266  if url.startswith("http://"):
267  full_list.append("https" + url[4:])
268  # but keep the http in case of connection problems
269  full_list.append(url)
270  return full_list
271 
272  def __init__(self, base_url=None, max_connections=10, retries=3):
273  """
274  Create a new instance of the interface
275 
276  Args:
277  base_url (string): base url of the rest interface
278  max_connections (int): number of connections to keep open, mostly useful for threaded applications
279  retries (int): number of retries in case of connection problems
280  """
281 
282 
283  self._session_session = requests.Session()
284  # and set the connection options we want to have
285  adapter = requests.adapters.HTTPAdapter(
286  pool_connections=max_connections, pool_maxsize=max_connections,
287  max_retries=retries, pool_block=True
288  )
289  self._session_session.mount("http://", adapter)
290  self._session_session.mount("https://", adapter)
291  # and also set the proxy settings
292  if "BELLE2_CONDB_PROXY" in os.environ:
293  self._session_session.proxies = {
294  "http": os.environ.get("BELLE2_CONDB_PROXY"),
295  "https": os.environ.get("BELLE2_CONDB_PROXY"),
296  }
297  # test the given url or try the known defaults
298  base_url_list = ConditionsDB.get_base_urls(base_url)
299 
300  for url in base_url_list:
301 
302  self._base_url_base_url = url.rstrip("/") + "/"
303  try:
304  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
305  req.raise_for_status()
306  except requests.RequestException as e:
307  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
308  else:
309  break
310  else:
311  B2FATAL("No working database servers configured, giving up")
312 
313  # We have a working server so change the api to return json instead of
314  # xml, much easier in python, also request non-cached replies. We do
315  # this now because for the server check above we're fine with cached
316  # results.
317  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
318 
319  def set_authentication_token(self, token):
320  """
321  Set authentication token when talking to the database
322 
323  Args:
324  token (str): JWT to hand to the database. Will not be checked
325  for validity here.
326  """
327  self._session_session.auth = BearerAuth(token)
328 
329  def request(self, method, url, message=None, *args, **argk):
330  """
331  Request function, similar to requests.request but adding the base_url
332 
333  Args:
334  method (str): GET, POST, etc.
335  url (str): url for the request, base_url will be prepended
336  message (str): message to show when starting the request and if it fails
337 
338  All other arguments will be forwarded to requests.request.
339  """
340  if message is not None:
341  B2INFO(message)
342 
343  try:
344  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
345  except requests.exceptions.ConnectionError as e:
346  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
347 
348  if req.status_code >= 300:
349  # Apparently something is not good. Let's try to decode the json
350  # reply containing reason and message
351  try:
352  response = req.json()
353  message = response.get("message", "")
354  colon = ": " if message.strip() else ""
355  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
356  method=method, url=url,
357  code=response["code"],
358  reason=response["reason"],
359  message=message,
360  colon=colon,
361  )
362  except json.JSONDecodeError:
363  # seems the reply was not even json
364  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
365  method=method, url=url,
366  code=req.status_code,
367  content=req.content
368  )
369 
370  if message is not None:
371  raise ConditionsDB.RequestError(f"{message} failed: {error}")
372  else:
373  raise ConditionsDB.RequestError(error)
374 
375  if method != "HEAD" and req.status_code != requests.codes.no_content:
376  try:
377  req.json()
378  except json.JSONDecodeError as e:
379  B2INFO(f"Invalid response: {req.content}")
380  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
381  .format(e, method=method, url=url))
382  return req
383 
384  def get_globalTags(self):
385  """Get a list of all globaltags. Returns a dictionary with the globaltag
386  names and the corresponding ids in the database"""
387 
388  try:
389  req = self.requestrequest("GET", "/globalTags")
390  except ConditionsDB.RequestError as e:
391  B2ERROR(f"Could not get the list of globaltags: {e}")
392  return None
393 
394  result = {}
395  for tag in req.json():
396  result[tag["name"]] = tag
397 
398  return result
399 
400  def has_globalTag(self, name):
401  """Check whether the globaltag with the given name exists."""
402 
403  try:
404  self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
406  return False
407 
408  return True
409 
410  def get_globalTagInfo(self, name):
411  """Get the id of the globaltag with the given name. Returns either the
412  id or None if the tag was not found"""
413 
414  try:
415  req = self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
416  except ConditionsDB.RequestError as e:
417  B2ERROR(f"Cannot find globaltag '{name}': {e}")
418  return None
419 
420  return req.json()
421 
422  def get_globalTagType(self, name):
423  """
424  Get the dictionary describing the given globaltag type (currently
425  one of DEV or RELEASE). Returns None if tag type was not found.
426  """
427  try:
428  req = self.requestrequest("GET", "/globalTagType")
429  except ConditionsDB.RequestError as e:
430  B2ERROR(f"Could not get list of valid globaltag types: {e}")
431  return None
432 
433  types = {e["name"]: e for e in req.json()}
434 
435  if name in types:
436  return types[name]
437 
438  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
439  return None
440 
441  def create_globalTag(self, name, description, user):
442  """
443  Create a new globaltag
444  """
445  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
446  try:
447  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
448  except ConditionsDB.RequestError as e:
449  B2ERROR(f"Could not create globaltag {name}: {e}")
450  return None
451 
452  return req.json()
453 
454  def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False):
455  """
456  Return list of all payloads in the given globaltag where each element is
457  a `PayloadInformation` instance
458 
459  Parameters:
460  gobalTag (str): name of the globaltag
461  exp (int): if given limit the list of payloads to the ones valid for
462  the given exp,run combination
463  run (int): if given limit the list of payloads to the ones valid for
464  the given exp,run combination
465  message (str): additional message to show when downloading the
466  payload information. Will be directly appended to
467  "Obtaining lists of iovs for globaltag {globalTag}"
468  run_range (tuple): if given limit the list of payloads to the ones
469  overlapping with the given run range, if
470  fully_contained (bool): if True and the run_range is not None it limits
471  the list of payloads to the ones fully contained in the given run range
472 
473  Warning:
474  Both, exp and run, need to be given at the same time. Just supplying
475  an experiment or a run number will not work
476  """
477  globalTag = encode_name(globalTag)
478  if message is None:
479  message = ""
480  if run_range is not None:
481  if fully_contained:
482  message += f" [fully contained in {tuple(run_range)}]"
483  else:
484  message += f" [valid in {tuple(run_range)}]"
485  run_range = IntervalOfValidity(run_range)
486 
487  if exp is not None:
488  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
489  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
490  else:
491  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
492  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
493  all_iovs = []
494  for item in req.json():
495  payload = item["payload" if 'payload' in item else "payloadId"]
496  if "payloadIov" in item:
497  iovs = [item['payloadIov']]
498  else:
499  iovs = item['payloadIovs']
500 
501  for iov in iovs:
502  if run_range is not None:
503  iov_ = IntervalOfValidity(iov['expStart'], iov['runStart'], iov['expEnd'], iov['runEnd'])
504  if fully_contained:
505  if not iov_ & run_range == iov_:
506  continue
507  else:
508  if iov_ & run_range is None:
509  continue
510  all_iovs.append(PayloadInformation.from_json(payload, iov))
511 
512  all_iovs.sort()
513  return all_iovs
514 
515  def get_payloads(self, global_tag=None):
516  """
517  Get a list of all defined payloads (for the given global_tag or by default for all).
518  Returns a dictionary which maps (module, checksum) to the payload id.
519  """
520 
521  try:
522  if global_tag:
523  req = self.requestrequest("GET", "/globalTag/{global_tag}/payloads"
524  .format(global_tag=encode_name(global_tag)))
525  else:
526  req = self.requestrequest("GET", "/payloads")
527  except ConditionsDB.RequestError as e:
528  B2ERROR(f"Cannot get list of payloads: {e}")
529  return {}
530 
531  result = {}
532  for payload in req.json():
533  module = payload["basf2Module"]["name"]
534  checksum = payload["checksum"]
535  result[(module, checksum)] = payload["payloadId"]
536 
537  return result
538 
539  def check_payloads(self, payloads, information="payloadId"):
540  """
541  Check for the existence of payloads in the database.
542 
543  Arguments:
544  payloads (list((str,str))): A list of payloads to check for. Each
545  payload needs to be a tuple of the name of the payload and the
546  md5 checksum of the payload file.
547  information (str): The information to be extracted from the
548  payload dictionary
549 
550  Returns:
551  A dictionary with the payload identifiers (name, checksum) as keys
552  and the requested information as values for all payloads which are already
553  present in the database.
554  """
555 
556  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
557  try:
558  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
559  except ConditionsDB.RequestError as e:
560  B2ERROR(f"Cannot check for existing payloads: {e}")
561  return {}
562 
563  result = {}
564  for payload in req.json():
565  module = payload["basf2Module"]["name"]
566  checksum = payload["checksum"]
567  result[(module, checksum)] = payload[information]
568 
569  return result
570 
571  def get_revisions(self, entries):
572  """
573  Get the revision numbers of payloads in the database.
574 
575  Arguments:
576  entries (list): A list of payload entries.
577  Each entry must have the attributes module and checksum.
578 
579  Returns:
580  True if successful.
581  """
582 
583  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
584  if not result:
585  return False
586 
587  for entry in entries:
588  entry.revision = result.get((entry.module, entry.checksum), 0)
589 
590  return True
591 
592  def create_payload(self, module, filename, checksum=None):
593  """
594  Create a new payload
595 
596  Args:
597  module (str): name of the module
598  filename (str): name of the file
599  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
600  """
601  if checksum is None:
602  checksum = file_checksum(filename)
603 
604  # this is the only complicated request as we have to provide a
605  # multipart/mixed request which is not directly provided by the request
606  # library.
607  files = [
608  (filename, open(filename, "rb").read(), "application/x-root"),
609  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
610  ]
611  # ok we have the two "files" we want to send, create multipart/mixed
612  # body
613  fields = []
614  for name, contents, mimetype in files:
615  rf = RequestField(name=name, data=contents)
616  rf.make_multipart(content_type=mimetype)
617  fields.append(rf)
618 
619  post_body, content_type = encode_multipart_formdata(fields)
620  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
621  headers = {'Content-Type': content_type}
622 
623  # now make the request. Note to self: if multipart/form-data would be
624  # accepted this would be so much nicer here. but it works.
625  try:
626  req = self.requestrequest("POST", "/package/dbstore/module/{moduleName}/payload"
627  .format(moduleName=encode_name(module)),
628  data=post_body, headers=headers)
629  except ConditionsDB.RequestError as e:
630  B2ERROR(f"Could not create Payload: {e}")
631  return None
632 
633  return req.json()["payloadId"]
634 
635  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
636  """
637  Create an iov.
638 
639  Args:
640  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
641  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
642  firstExp (int): first experiment for which this iov is valid
643  firstRun (int): first run for which this iov is valid
644  finalExp (int): final experiment for which this iov is valid
645  finalRun (int): final run for which this iov is valid
646 
647  Returns:
648  payloadIovId of the created iov, None if creation was not successful
649  """
650  try:
651  # try to convert all arguments except self to integers to make sure they are
652  # valid.
653  local_variables = locals()
654  variables = {e: int(local_variables[e]) for e in
655  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
656  except ValueError:
657  B2ERROR("create_iov: All parameters need to be integers")
658  return None
659 
660  # try to create the iov
661  try:
662  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
663  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
664  except ConditionsDB.RequestError as e:
665  B2ERROR(f"Could not create IOV: {e}")
666  return None
667 
668  return req.json()["payloadIovId"]
669 
670  def delete_iov(self, iovId):
671  """Delete an iov
672 
673  Args:
674  iovId (int): id of the iov to be deleted
675  """
676  try:
677  self.requestrequest("DELETE", f"/payloadIov/{iovId}")
678  except ConditionsDB.RequestError as e:
679  B2ERROR(f"Could not delete IOV: {e}")
680 
681  def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun):
682  """Modify the validity range of a given iov
683 
684  Args:
685  iovId (int): id of the iov to be modified
686  firstExp (int): first experiment for which this iov is valid
687  firstRun (int): first run for which this iov is valid
688  finalExp (int): final experiment for which this iov is valid
689  finalRun (int): final run for which this iov is valid
690  """
691  try:
692  querystring = {"expStart": str(firstExp), "runStart": str(firstRun), "expEnd": str(finalExp), "runEnd": str(finalRun)}
693  self.requestrequest("PUT", f"/payloadIov/{iovId}", params=querystring)
694  except ConditionsDB.RequestError as e:
695  B2ERROR(f"Could not modify IOV: {e}")
696 
697  def get_iovs(self, globalTagName, payloadName=None):
698  """Return existing iovs for a given tag name. It returns a dictionary
699  which maps (payloadId, first runId, final runId) to iovId
700 
701  Parameters:
702  globalTagName(str): Global tag name.
703  payloadName(str): Payload name (if None, selection by name is
704  not performed.
705  """
706 
707  try:
708  req = self.requestrequest("GET", "/globalTag/{globalTagName}/globalTagPayloads"
709  .format(globalTagName=encode_name(globalTagName)))
711  # there could be just no iovs so no error
712  return {}
713 
714  result = {}
715  for payload in req.json():
716  payloadId = payload["payloadId"]["payloadId"]
717  if payloadName is not None:
718  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
719  continue
720  for iov in payload["payloadIovs"]:
721  iovId = iov["payloadIovId"]
722  firstExp, firstRun = iov["expStart"], iov["runStart"]
723  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
724  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
725 
726  return result
727 
728  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
729  """
730  Upload a testing payload storage to the conditions database.
731 
732  Parameters:
733  filename (str): filename of the testing payload storage file that should be uploaded
734  global_tage (str): name of the globaltag to which the data should be uploaded
735  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
736  same content, if normalize is a string in addition the file name in the root file metadata will be set to it
737  ignore_existing (bool): if True do not upload payloads that already exist
738  nprocess (int): maximal number of parallel uploads
739  uploaded_entries (list): the list of successfully uploaded entries
740 
741  Returns:
742  True if the upload was successful
743  """
744 
745  # first create a list of payloads
746  from conditions_db.testing_payloads import parse_testing_payloads_file
747  B2INFO(f"Reading payload list from {filename}")
748  entries = parse_testing_payloads_file(filename)
749  if entries is None:
750  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
751  return False
752 
753  if not entries:
754  B2INFO(f"No payloads found in {filename}, exiting")
755  return True
756 
757  B2INFO(f"Found {len(entries)} iovs to upload")
758 
759  # time to get the id for the globaltag
760  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
761  if tagId is None:
762  return False
763  tagId = tagId["globalTagId"]
764 
765  # now we could have more than one payload with the same iov so let's go over
766  # it again and remove duplicates but keep the last one for each
767  entries = sorted(set(reversed(entries)))
768 
769  if normalize:
770  name = normalize if normalize is not True else None
771  for e in entries:
772  e.normalize(name=name)
773 
774  # so let's have a list of all payloads (name, checksum) as some payloads
775  # might have multiple iovs. Each payload gets a list of all of those
776  payloads = defaultdict(list)
777  for e in entries:
778  payloads[(e.module, e.checksum)].append(e)
779 
780  existing_payloads = {}
781  existing_iovs = {}
782 
783  def upload_payload(item):
784  """Upload a payload file if necessary but first check list of existing payloads"""
785  key, entries = item
786  if key in existing_payloads:
787  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
788  payload_id = existing_payloads[key]
789  else:
790  entry = entries[0]
791  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
792  if payload_id is None:
793  return False
794 
795  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
796 
797  for entry in entries:
798  entry.payload = payload_id
799 
800  return True
801 
802  def create_iov(entry):
803  """Create an iov if necessary but first check the list of existing iovs"""
804  if entry.payload is None:
805  return None
806 
807  iov_key = (entry.payload,) + entry.iov_tuple
808  if iov_key in existing_iovs:
809  entry.iov = existing_iovs[iov_key]
810  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
811  else:
812  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
813  if entry.payloadIovId is None:
814  return False
815 
816  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
817 
818  return entry
819 
820  # multithreading for the win ...
821  with ThreadPoolExecutor(max_workers=nprocess) as pool:
822  # if we want to check for existing payloads/iovs we schedule the download of
823  # the full payload list. And write a message as each completes
824  if not ignore_existing:
825  B2INFO("Downloading information about existing payloads and iovs...")
826  futures = []
827  existing_iovs = {}
828  existing_payloads = {}
829 
830  def create_future(iter, func, callback=None):
831  fn = pool.submit(iter, func)
832  if callback is not None:
833  fn.add_done_callback(callback)
834  futures.append(fn)
835 
836  def update_iovs(iovs):
837  existing_iovs.update(iovs.result())
838  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
839 
840  def update_payloads(payloads):
841  existing_payloads.update(payloads.result())
842  B2INFO(f"Found {len(existing_payloads)} existing payloads")
843 
844  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
845  # checking existing payloads should not be done with too many at once
846  for chunk in chunks(payloads.keys(), 1000):
847  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
848 
849  futures_wait(futures)
850 
851  # upload payloads
852  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
853  if failed_payloads > 0:
854  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
855 
856  # create IoVs
857  failed_iovs = 0
858  for entry in pool.map(create_iov, entries):
859  if entry:
860  if uploaded_entries is not None:
861  uploaded_entries.append(entry)
862  else:
863  failed_iovs += 1
864  if failed_iovs > 0:
865  B2ERROR(f"{failed_iovs} IoVs could not be created")
866 
867  # update revision numbers
868  if uploaded_entries is not None:
869  self.get_revisionsget_revisions(uploaded_entries)
870 
871  return failed_payloads + failed_iovs == 0
872 
873  def staging_request(self, filename, normalize, data, password):
874  """
875  Upload a testing payload storage to a staging globaltag and create or update a jira issue
876 
877  Parameters:
878  filename (str): filename of the testing payload storage file that should be uploaded
879  normalize (Union[bool, str]): if True the payload root files will be
880  normalized to have the same checksum for the same content, if
881  normalize is a string in addition the file name in the root file
882  metadata will be set to it
883  data (dict): a dictionary with the information provided by the user:
884 
885  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
886  * tag: the globaltag name
887  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
888  if new payload classes are introduced or payload classes are modified then they will first be included in
889  the main globaltag. Here a synchronization of code and payload changes has to be managed.
890  If new or modified payload classes should be included in other globaltags they must already be in a release.
891  * pull-request: number of the pull request containing new or modified payload classes,
892  only for request == New or Modified
893  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
894  only for request == Modified
895  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
896  only for request == Modified
897  * release: the required release version
898  * reason: the reason for the request
899  * description: a detailed description for the globaltag manager
900  * issue: identifier of an existing jira issue (optional)
901  * user: name of the user
902  * time: time stamp of the request
903 
904  password: the password for access to jira or the access token and secret for oauth access
905 
906  Returns:
907  True if the upload and jira issue creation/upload was successful
908  """
909 
910  # determine the staging globaltag name
911  data['tag'] = upload_global_tag(data['task'])
912  if data['tag'] is None:
913  data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
914 
915  # create the staging globaltag if it does not exists yet
916  if not self.has_globalTaghas_globalTag(data['tag']):
917  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
918  return False
919 
920  # upload the payloads
921  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
922  entries = []
923  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
924  return False
925 
926  # get the dictionary for the jira issue creation/update
927  if data['issue']:
928  issue = data['issue']
929  else:
930  issue = jira_global_tag_v2(data['task'])
931  if issue is None:
932  issue = {"components": [{"name": "globaltag"}]}
933 
934  # create jira issue text from provided information
935  if type(issue) is tuple:
936  description = issue[1].format(**data)
937  issue = issue[0]
938  else:
939  description = f"""
940 |*Upload globaltag* | {data['tag']} |
941 |*Request reason* | {data['reason']} |
942 |*Required release* | {data['release']} |
943 |*Type of request* | {data['request']} |
944 """
945  if 'pull-request' in data.keys():
946  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
947  if 'backward-compatibility' in data.keys():
948  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
949  if 'forward-compatibility' in data.keys():
950  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
951  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
952  if data['task'] == 'online':
953  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
954 
955  # add information about uploaded payloads/IoVs
956  description += '\nPayloads\n||Name||Revision||IoV||\n'
957  for entry in entries:
958  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
959 
960  # create a new issue
961  if type(issue) is dict:
962  issue["description"] = description
963  if "summary" in issue.keys():
964  issue["summary"] = issue["summary"].format(**data)
965  else:
966  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
967  if "project" not in issue.keys():
968  issue["project"] = {"key": "BII"}
969  if "issuetype" not in issue.keys():
970  issue["issuetype"] = {"name": "Task"}
971  if data["task"] == "main":
972  issue["labels"] = ["TUPPR"]
973 
974  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
975  if isinstance(password, str):
976  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
977  json={'fields': issue})
978  else:
979  fields = {'issue': json.dumps(issue)}
980  if 'user' in data.keys():
981  fields['user'] = data['user']
982  if password:
983  fields['token'] = password[0]
984  fields['secret'] = password[1]
985  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
986  if response.status_code in range(200, 210):
987  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
988  else:
989  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
990  return False
991 
992  # comment on an existing issue
993  else:
994  # Let's make sure all assignees of new issues are added as watchers
995  # in that case, otherwise they might never find out
996  new_issue_config = jira_global_tag_v2(data['task'])
997  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
998  user = new_issue_config['assignee'].get('name', None)
999  if user is not None and isinstance(password, str):
1000  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
1001  auth=(data['user'], password), json=user)
1002  if response.status_code in range(200, 210):
1003  B2INFO(f"Added {user} as watcher to {issue}")
1004  else:
1005  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
1006 
1007  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
1008  if isinstance(password, str):
1009  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
1010  auth=(data['user'], password), json={'body': description})
1011  else:
1012  fields = {'id': issue, 'user': user, 'comment': description}
1013  if password:
1014  fields['token'] = password[0]
1015  fields['secret'] = password[1]
1016  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
1017  if response.status_code in range(200, 210):
1018  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
1019  else:
1020  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
1021  return False
1022 
1023  return True
1024 
1025 
1026 def require_database_for_test(timeout=60, base_url=None):
1027  """Make sure that the database is available and skip the test if not.
1028 
1029  This function should be called in test scripts if they are expected to fail
1030  if the database is down. It either returns when the database is ok or it
1031  will signal test_basf2 that the test should be skipped and exit
1032  """
1033  import sys
1034  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
1035  raise Exception("Access to the Database is disabled")
1036  base_url_list = ConditionsDB.get_base_urls(base_url)
1037  for url in base_url_list:
1038  try:
1039  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
1040  req.raise_for_status()
1041  except requests.RequestException as e:
1042  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
1043  else:
1044  break
1045  else:
1046  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
1047  sys.exit(1)
1048 
1049 
1050 def enable_debugging():
1051  """Enable verbose output of python-requests to be able to debug http connections"""
1052  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
1053  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
1054  # The only thing missing will be the response.body which is not logged.
1055  import http.client as http_client
1056  import logging
1057  http_client.HTTPConnection.debuglevel = 1
1058  # You must initialize logging, otherwise you'll not see debug output.
1059  logging.basicConfig()
1060  logging.getLogger().setLevel(logging.DEBUG)
1061  requests_log = logging.getLogger("requests.packages.urllib3")
1062  requests_log.setLevel(logging.DEBUG)
1063  requests_log.propagate = True
def __call__(self, r)
Definition: __init__.py:129
_authtoken
Authorization header to send with each request.
Definition: __init__.py:127
def __init__(self, token)
Definition: __init__.py:124
def get_payloads(self, global_tag=None)
Definition: __init__.py:515
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:697
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:728
def set_authentication_token(self, token)
Definition: __init__.py:319
def delete_iov(self, iovId)
Definition: __init__.py:670
def get_all_iovs(self, globalTag, exp=None, run=None, message=None, run_range=None, fully_contained=False)
Definition: __init__.py:454
def get_globalTagType(self, name)
Definition: __init__.py:422
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:539
def has_globalTag(self, name)
Definition: __init__.py:400
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:635
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:329
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:272
def get_revisions(self, entries)
Definition: __init__.py:571
def create_globalTag(self, name, description, user)
Definition: __init__.py:441
def get_globalTagInfo(self, name)
Definition: __init__.py:410
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:283
def get_base_urls(given_url)
Definition: __init__.py:241
def modify_iov(self, iovId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:681
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:873
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:592
_base_url
base url to be prepended to all requests
Definition: __init__.py:302
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:174
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:176
name
name of the payload
Definition: __init__.py:166
checksum
checksum of the payload
Definition: __init__.py:168
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:161
def from_json(cls, payload, iov=None)
Definition: __init__.py:140
revision
revision, not used for comparisons
Definition: __init__.py:172
iov
interval of validity
Definition: __init__.py:170