Belle II Software  release-08-00-04
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 import stat
20 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING, conditions
21 import requests
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 from typing import Union # noqa
32 import getpass
33 import tempfile
34 
35 
36 def encode_name(name):
37  """Escape name to be used in an url"""
38  return urllib.parse.quote(name, safe="")
39 
40 
41 def file_checksum(filename):
42  """Calculate md5 hash of file"""
43  md5hash = hashlib.md5()
44  with open(filename, "rb") as data:
45  md5hash.update(data.read())
46  return md5hash.hexdigest()
47 
48 
49 def chunks(container, chunk_size):
50  """Cut a container in chunks of max. chunk_size"""
51  it = iter(container)
52  while True:
53  chunk = tuple(itertools.islice(it, chunk_size))
54  if not chunk:
55  return
56  yield chunk
57 
58 
59 def get_cdb_authentication_token(path=None):
60  """
61  Helper function for correctly retrieving the CDB authentication token (either via file or via issuing server).
62 
63  :param path: Path to a file containing a CDB authentication token; if None, the function will use
64  a default path (``${HOME}/b2cdb_${BELLE2_USER}.token`` or ``/tmp/b2cdb_${BELLE2_USER}.token``) to look for a token.
65  """
66  # if we pass a path, let's use it for getting the token, otherwise use the default one
67  if path:
68  path_to_token = path
69  else:
70  path_to_token = os.path.join(os.getenv('HOME', tempfile.gettempdir()), f'b2cdb_{os.getenv("BELLE2_USER", None)}.token')
71 
72  # check validity of existing token
73  if os.path.isfile(path_to_token):
74  with open(path_to_token) as token_file:
75  response = requests.get('https://token.belle2.org/check', verify=False, params={'token': token_file.read().strip()})
76  if response.status_code == 400:
77  B2INFO(f'The file {path_to_token} contains an invalid token, getting a new token...')
78  os.unlink(path_to_token)
79  elif response.status_code > 400:
80  B2WARNING("The validity of the existing token could not be checked. Trying to connect to CDB anyway.")
81 
82  # request a token if there is none
83  if not os.path.isfile(path_to_token):
84  username = os.environ['BELLE2_USER']
85  othername = input(f'\nConfirm your DESY user name by pressing enter or type the correct one [{username}]: ')
86  if othername != '':
87  username = othername
88  password = getpass.getpass("Please type your DESY password to request a CDB token: ")
89  response = requests.get('https://token.belle2.org', verify=False, auth=(username, password))
90  print(response.content)
91  if response.status_code == requests.codes.ok:
92  with open(path_to_token, 'wb') as token_file:
93  os.chmod(path_to_token, stat.S_IRUSR | stat.S_IWUSR)
94  token_file.write(response.content)
95  else:
96  B2ERROR('Failed to get token')
97  return None
98 
99  # read and return token
100  with open(path_to_token) as token_file:
101  return token_file.read().strip()
102 
103 
104 def set_cdb_authentication_token(cdb_instance, auth_token=None):
105  """
106  Helper function for setting the CDB authentication token.
107 
108  :param cdb_instance: An instance of the ``ConditionsDB`` class.
109  :param auth_token: A CDB authentication token: if ``None``, it is automatically retrieved from the issuing server
110  and then set
111  """
112  if auth_token is not None:
113  cdb_instance.set_authentication_token(auth_token)
114  else:
115  # If something goes wrong with the auth. token, the function returns None and the authentication will fail
116  token = get_cdb_authentication_token(os.getenv('BELLE2_CDB_AUTH_TOKEN', default=None))
117  cdb_instance.set_authentication_token(token)
118 
119 
120 class BearerAuth(requests.auth.AuthBase):
121  """Simple class to present bearer token instead of username/password"""
122 
123  def __init__(self, token):
124  """Construct from a token"""
125 
126  self._authtoken_authtoken = f"Bearer {token}"
127 
128  def __call__(self, r):
129  """Update headers to include token"""
130  r.headers["X-Authorization"] = self._authtoken_authtoken
131  return r
132 
133 
135  """Small container class to help compare payload information for efficient
136  comparison between globaltags"""
137 
138  @classmethod
139  def from_json(cls, payload, iov=None):
140  """Set all internal members from the json information of the payload and the iov.
141 
142  Arguments:
143  payload (dict): json information of the payload as returned by REST api
144  iov (dict): json information of the iov as returned by REST api
145  """
146  if iov is None:
147  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
148 
149  return cls(
150  payload['payloadId'],
151  payload['basf2Module']['name'],
152  payload['revision'],
153  payload['checksum'],
154  payload['payloadUrl'],
155  payload['baseUrl'],
156  iov['payloadIovId'],
157  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
158  )
159 
160  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
161  """
162  Create a new object from the given information
163  """
164 
165  self.namename = name
166 
167  self.checksumchecksum = checksum
168 
169  self.ioviov = iov
170 
171  self.revisionrevision = revision
172 
173  self.payload_idpayload_id = payload_id
174 
175  self.iov_idiov_id = iov_id
176 
177  self.base_urlbase_url = base_url
178 
179  self.payload_urlpayload_url = payload_url
180 
181  @property
182  def url(self):
183  """Return the full url to the payload on the server"""
184  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
185 
186  def __hash__(self):
187  """Make object hashable"""
188  return hash((self.namename, self.checksumchecksum, self.ioviov))
189 
190  def __eq__(self, other):
191  """Check if two payloads are equal"""
192  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
193 
194  def __lt__(self, other):
195  """Sort payloads by name, iov, revision"""
196  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
197 
198  def readable_iov(self):
199  """return a human readable name for the IoV"""
200  if self.ioviov is None:
201  return "none"
202 
203  if self.ioviov == (0, 0, -1, -1):
204  return "always"
205 
206  e1, r1, e2, r2 = self.ioviov
207  if e1 == e2:
208  if r1 == 0 and r2 == -1:
209  return f"exp {e1}"
210  elif r2 == -1:
211  return f"exp {e1}, runs {r1}+"
212  elif r1 == r2:
213  return f"exp {e1}, run {r1}"
214  else:
215  return f"exp {e1}, runs {r1} - {r2}"
216  else:
217  if e2 == -1 and r1 == 0:
218  return f"exp {e1} - forever"
219  elif e2 == -1:
220  return f"exp {e1}, run {r1} - forever"
221  elif r1 == 0 and r2 == -1:
222  return f"exp {e1}-{e2}, all runs"
223  elif r2 == -1:
224  return f"exp {e1}, run {r1} - exp {e2}, all runs"
225  else:
226  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
227 
228 
230  """Class to interface conditions db REST interface"""
231 
232 
233  BASE_URLS = [conditions.default_metadata_provider_url]
234 
235  class RequestError(RuntimeError):
236  """Class to be thrown by request() if there is any error"""
237  pass
238 
239  @staticmethod
240  def get_base_urls(given_url):
241  """Resolve the list of server urls. If a url is given just return it.
242  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
243  builtin defaults
244 
245  Arguments:
246  given_url (str): Explicit base_url. If this is not None it will be
247  returned as is in a list of length 1
248 
249  Returns:
250  a list of urls to try for database connectivity
251  """
252 
253  base_url_list = ConditionsDB.BASE_URLS[:]
254  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
255  if given_url is not None:
256  base_url_list = [given_url]
257  elif base_url_env is not None:
258  base_url_list = base_url_env.split()
259  B2INFO("Getting Conditions Database servers from Environment:")
260  for i, url in enumerate(base_url_list, 1):
261  B2INFO(f" {i}. {url}")
262  # try to escalate to https for all given urls
263  full_list = []
264  for url in base_url_list:
265  if url.startswith("http://"):
266  full_list.append("https" + url[4:])
267  # but keep the http in case of connection problems
268  full_list.append(url)
269  return full_list
270 
271  def __init__(self, base_url=None, max_connections=10, retries=3):
272  """
273  Create a new instance of the interface
274 
275  Args:
276  base_url (string): base url of the rest interface
277  max_connections (int): number of connections to keep open, mostly useful for threaded applications
278  retries (int): number of retries in case of connection problems
279  """
280 
281 
282  self._session_session = requests.Session()
283  # and set the connection options we want to have
284  adapter = requests.adapters.HTTPAdapter(
285  pool_connections=max_connections, pool_maxsize=max_connections,
286  max_retries=retries, pool_block=True
287  )
288  self._session_session.mount("http://", adapter)
289  self._session_session.mount("https://", adapter)
290  # and also set the proxy settings
291  if "BELLE2_CONDB_PROXY" in os.environ:
292  self._session_session.proxies = {
293  "http": os.environ.get("BELLE2_CONDB_PROXY"),
294  "https": os.environ.get("BELLE2_CONDB_PROXY"),
295  }
296  # test the given url or try the known defaults
297  base_url_list = ConditionsDB.get_base_urls(base_url)
298 
299  for url in base_url_list:
300 
301  self._base_url_base_url = url.rstrip("/") + "/"
302  try:
303  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
304  req.raise_for_status()
305  except requests.RequestException as e:
306  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
307  else:
308  break
309  else:
310  B2FATAL("No working database servers configured, giving up")
311 
312  # We have a working server so change the api to return json instead of
313  # xml, much easier in python, also request non-cached replies. We do
314  # this now because for the server check above we're fine with cached
315  # results.
316  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
317 
318  def set_authentication_token(self, token):
319  """
320  Set authentication token when talking to the database
321 
322  Args:
323  token (str): JWT to hand to the database. Will not be checked
324  for validity here.
325  """
326  self._session_session.auth = BearerAuth(token)
327 
328  def request(self, method, url, message=None, *args, **argk):
329  """
330  Request function, similar to requests.request but adding the base_url
331 
332  Args:
333  method (str): GET, POST, etc.
334  url (str): url for the request, base_url will be prepended
335  message (str): message to show when starting the request and if it fails
336 
337  All other arguments will be forwarded to requests.request.
338  """
339  if message is not None:
340  B2INFO(message)
341 
342  try:
343  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
344  except requests.exceptions.ConnectionError as e:
345  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
346 
347  if req.status_code >= 300:
348  # Apparently something is not good. Let's try to decode the json
349  # reply containing reason and message
350  try:
351  response = req.json()
352  message = response.get("message", "")
353  colon = ": " if message.strip() else ""
354  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
355  method=method, url=url,
356  code=response["code"],
357  reason=response["reason"],
358  message=message,
359  colon=colon,
360  )
361  except json.JSONDecodeError:
362  # seems the reply was not even json
363  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
364  method=method, url=url,
365  code=req.status_code,
366  content=req.content
367  )
368 
369  if message is not None:
370  raise ConditionsDB.RequestError(f"{message} failed: {error}")
371  else:
372  raise ConditionsDB.RequestError(error)
373 
374  if method != "HEAD" and req.status_code != requests.codes.no_content:
375  try:
376  req.json()
377  except json.JSONDecodeError as e:
378  B2INFO(f"Invalid response: {req.content}")
379  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
380  .format(e, method=method, url=url))
381  return req
382 
383  def get_globalTags(self):
384  """Get a list of all globaltags. Returns a dictionary with the globaltag
385  names and the corresponding ids in the database"""
386 
387  try:
388  req = self.requestrequest("GET", "/globalTags")
389  except ConditionsDB.RequestError as e:
390  B2ERROR(f"Could not get the list of globaltags: {e}")
391  return None
392 
393  result = {}
394  for tag in req.json():
395  result[tag["name"]] = tag
396 
397  return result
398 
399  def has_globalTag(self, name):
400  """Check whether the globaltag with the given name exists."""
401 
402  try:
403  self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
405  return False
406 
407  return True
408 
409  def get_globalTagInfo(self, name):
410  """Get the id of the globaltag with the given name. Returns either the
411  id or None if the tag was not found"""
412 
413  try:
414  req = self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
415  except ConditionsDB.RequestError as e:
416  B2ERROR(f"Cannot find globaltag '{name}': {e}")
417  return None
418 
419  return req.json()
420 
421  def get_globalTagType(self, name):
422  """
423  Get the dictionary describing the given globaltag type (currently
424  one of DEV or RELEASE). Returns None if tag type was not found.
425  """
426  try:
427  req = self.requestrequest("GET", "/globalTagType")
428  except ConditionsDB.RequestError as e:
429  B2ERROR(f"Could not get list of valid globaltag types: {e}")
430  return None
431 
432  types = {e["name"]: e for e in req.json()}
433 
434  if name in types:
435  return types[name]
436 
437  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
438  return None
439 
440  def create_globalTag(self, name, description, user):
441  """
442  Create a new globaltag
443  """
444  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
445  try:
446  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
447  except ConditionsDB.RequestError as e:
448  B2ERROR(f"Could not create globaltag {name}: {e}")
449  return None
450 
451  return req.json()
452 
453  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
454  """
455  Return list of all payloads in the given globaltag where each element is
456  a `PayloadInformation` instance
457 
458  Parameters:
459  gobalTag (str): name of the globaltag
460  exp (int): if given limit the list of payloads to the ones valid for
461  the given exp,run combination
462  run (int): if given limit the list of payloads to the ones valid for
463  the given exp,run combination
464  message (str): additional message to show when downloading the
465  payload information. Will be directly appended to
466  "Obtaining lists of iovs for globaltag {globalTag}"
467 
468  Warning:
469  Both, exp and run, need to be given at the same time. Just supplying
470  an experiment or a run number will not work
471  """
472  globalTag = encode_name(globalTag)
473  if message is None:
474  message = ""
475  if exp is not None:
476  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
477  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
478  else:
479  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
480  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
481  all_iovs = []
482  for item in req.json():
483  payload = item["payload" if 'payload' in item else "payloadId"]
484  if "payloadIov" in item:
485  iovs = [item['payloadIov']]
486  else:
487  iovs = item['payloadIovs']
488 
489  for iov in iovs:
490  all_iovs.append(PayloadInformation.from_json(payload, iov))
491 
492  all_iovs.sort()
493  return all_iovs
494 
495  def get_payloads(self, global_tag=None):
496  """
497  Get a list of all defined payloads (for the given global_tag or by default for all).
498  Returns a dictionary which maps (module, checksum) to the payload id.
499  """
500 
501  try:
502  if global_tag:
503  req = self.requestrequest("GET", "/globalTag/{global_tag}/payloads"
504  .format(global_tag=encode_name(global_tag)))
505  else:
506  req = self.requestrequest("GET", "/payloads")
507  except ConditionsDB.RequestError as e:
508  B2ERROR(f"Cannot get list of payloads: {e}")
509  return {}
510 
511  result = {}
512  for payload in req.json():
513  module = payload["basf2Module"]["name"]
514  checksum = payload["checksum"]
515  result[(module, checksum)] = payload["payloadId"]
516 
517  return result
518 
519  def check_payloads(self, payloads, information="payloadId"):
520  """
521  Check for the existence of payloads in the database.
522 
523  Arguments:
524  payloads (list((str,str))): A list of payloads to check for. Each
525  payload needs to be a tuple of the name of the payload and the
526  md5 checksum of the payload file.
527  information (str): The information to be extracted from the
528  payload dictionary
529 
530  Returns:
531  A dictionary with the payload identifiers (name, checksum) as keys
532  and the requested information as values for all payloads which are already
533  present in the database.
534  """
535 
536  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
537  try:
538  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
539  except ConditionsDB.RequestError as e:
540  B2ERROR(f"Cannot check for existing payloads: {e}")
541  return {}
542 
543  result = {}
544  for payload in req.json():
545  module = payload["basf2Module"]["name"]
546  checksum = payload["checksum"]
547  result[(module, checksum)] = payload[information]
548 
549  return result
550 
551  def get_revisions(self, entries):
552  """
553  Get the revision numbers of payloads in the database.
554 
555  Arguments:
556  entries (list): A list of payload entries.
557  Each entry must have the attributes module and checksum.
558 
559  Returns:
560  True if successful.
561  """
562 
563  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
564  if not result:
565  return False
566 
567  for entry in entries:
568  entry.revision = result.get((entry.module, entry.checksum), 0)
569 
570  return True
571 
572  def create_payload(self, module, filename, checksum=None):
573  """
574  Create a new payload
575 
576  Args:
577  module (str): name of the module
578  filename (str): name of the file
579  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
580  """
581  if checksum is None:
582  checksum = file_checksum(filename)
583 
584  # this is the only complicated request as we have to provide a
585  # multipart/mixed request which is not directly provided by the request
586  # library.
587  files = [
588  (filename, open(filename, "rb").read(), "application/x-root"),
589  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
590  ]
591  # ok we have the two "files" we want to send, create multipart/mixed
592  # body
593  fields = []
594  for name, contents, mimetype in files:
595  rf = RequestField(name=name, data=contents)
596  rf.make_multipart(content_type=mimetype)
597  fields.append(rf)
598 
599  post_body, content_type = encode_multipart_formdata(fields)
600  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
601  headers = {'Content-Type': content_type}
602 
603  # now make the request. Note to self: if multipart/form-data would be
604  # accepted this would be so much nicer here. but it works.
605  try:
606  req = self.requestrequest("POST", "/package/dbstore/module/{moduleName}/payload"
607  .format(moduleName=encode_name(module)),
608  data=post_body, headers=headers)
609  except ConditionsDB.RequestError as e:
610  B2ERROR(f"Could not create Payload: {e}")
611  return None
612 
613  return req.json()["payloadId"]
614 
615  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
616  """
617  Create an iov.
618 
619  Args:
620  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
621  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
622  firstExp (int): first experiment for which this iov is valid
623  firstRun (int): first run for which this iov is valid
624  finalExp (int): final experiment for which this iov is valid
625  finalRun (int): final run for which this iov is valid
626 
627  Returns:
628  payloadIovId of the created iov, None if creation was not successful
629  """
630  try:
631  # try to convert all arguments except self to integers to make sure they are
632  # valid.
633  local_variables = locals()
634  variables = {e: int(local_variables[e]) for e in
635  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
636  except ValueError:
637  B2ERROR("create_iov: All parameters need to be integers")
638  return None
639 
640  # try to create the iov
641  try:
642  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
643  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
644  except ConditionsDB.RequestError as e:
645  B2ERROR(f"Could not create IOV: {e}")
646  return None
647 
648  return req.json()["payloadIovId"]
649 
650  def get_iovs(self, globalTagName, payloadName=None):
651  """Return existing iovs for a given tag name. It returns a dictionary
652  which maps (payloadId, first runId, final runId) to iovId
653 
654  Parameters:
655  globalTagName(str): Global tag name.
656  payloadName(str): Payload name (if None, selection by name is
657  not performed.
658  """
659 
660  try:
661  req = self.requestrequest("GET", "/globalTag/{globalTagName}/globalTagPayloads"
662  .format(globalTagName=encode_name(globalTagName)))
664  # there could be just no iovs so no error
665  return {}
666 
667  result = {}
668  for payload in req.json():
669  payloadId = payload["payloadId"]["payloadId"]
670  if payloadName is not None:
671  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
672  continue
673  for iov in payload["payloadIovs"]:
674  iovId = iov["payloadIovId"]
675  firstExp, firstRun = iov["expStart"], iov["runStart"]
676  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
677  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
678 
679  return result
680 
681  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
682  """
683  Upload a testing payload storage to the conditions database.
684 
685  Parameters:
686  filename (str): filename of the testing payload storage file that should be uploaded
687  global_tage (str): name of the globaltag to which the data should be uploaded
688  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
689  same content, if normalize is a string in addition the file name in the root file metadata will be set to it
690  ignore_existing (bool): if True do not upload payloads that already exist
691  nprocess (int): maximal number of parallel uploads
692  uploaded_entries (list): the list of successfully uploaded entries
693 
694  Returns:
695  True if the upload was successful
696  """
697 
698  # first create a list of payloads
699  from conditions_db.testing_payloads import parse_testing_payloads_file
700  B2INFO(f"Reading payload list from {filename}")
701  entries = parse_testing_payloads_file(filename)
702  if entries is None:
703  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
704  return False
705 
706  if not entries:
707  B2INFO(f"No payloads found in {filename}, exiting")
708  return True
709 
710  B2INFO(f"Found {len(entries)} iovs to upload")
711 
712  # time to get the id for the globaltag
713  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
714  if tagId is None:
715  return False
716  tagId = tagId["globalTagId"]
717 
718  # now we could have more than one payload with the same iov so let's go over
719  # it again and remove duplicates but keep the last one for each
720  entries = sorted(set(reversed(entries)))
721 
722  if normalize:
723  name = normalize if normalize is not True else None
724  for e in entries:
725  e.normalize(name=name)
726 
727  # so let's have a list of all payloads (name, checksum) as some payloads
728  # might have multiple iovs. Each payload gets a list of all of those
729  payloads = defaultdict(list)
730  for e in entries:
731  payloads[(e.module, e.checksum)].append(e)
732 
733  existing_payloads = {}
734  existing_iovs = {}
735 
736  def upload_payload(item):
737  """Upload a payload file if necessary but first check list of existing payloads"""
738  key, entries = item
739  if key in existing_payloads:
740  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
741  payload_id = existing_payloads[key]
742  else:
743  entry = entries[0]
744  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
745  if payload_id is None:
746  return False
747 
748  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
749 
750  for entry in entries:
751  entry.payload = payload_id
752 
753  return True
754 
755  def create_iov(entry):
756  """Create an iov if necessary but first check the list of existing iovs"""
757  if entry.payload is None:
758  return None
759 
760  iov_key = (entry.payload,) + entry.iov_tuple
761  if iov_key in existing_iovs:
762  entry.iov = existing_iovs[iov_key]
763  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
764  else:
765  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
766  if entry.payloadIovId is None:
767  return False
768 
769  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
770 
771  return entry
772 
773  # multithreading for the win ...
774  with ThreadPoolExecutor(max_workers=nprocess) as pool:
775  # if we want to check for existing payloads/iovs we schedule the download of
776  # the full payload list. And write a message as each completes
777  if not ignore_existing:
778  B2INFO("Downloading information about existing payloads and iovs...")
779  futures = []
780  existing_iovs = {}
781  existing_payloads = {}
782 
783  def create_future(iter, func, callback=None):
784  fn = pool.submit(iter, func)
785  if callback is not None:
786  fn.add_done_callback(callback)
787  futures.append(fn)
788 
789  def update_iovs(iovs):
790  existing_iovs.update(iovs.result())
791  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
792 
793  def update_payloads(payloads):
794  existing_payloads.update(payloads.result())
795  B2INFO(f"Found {len(existing_payloads)} existing payloads")
796 
797  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
798  # checking existing payloads should not be done with too many at once
799  for chunk in chunks(payloads.keys(), 1000):
800  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
801 
802  futures_wait(futures)
803 
804  # upload payloads
805  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
806  if failed_payloads > 0:
807  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
808 
809  # create IoVs
810  failed_iovs = 0
811  for entry in pool.map(create_iov, entries):
812  if entry:
813  if uploaded_entries is not None:
814  uploaded_entries.append(entry)
815  else:
816  failed_iovs += 1
817  if failed_iovs > 0:
818  B2ERROR(f"{failed_iovs} IoVs could not be created")
819 
820  # update revision numbers
821  if uploaded_entries is not None:
822  self.get_revisionsget_revisions(uploaded_entries)
823 
824  return failed_payloads + failed_iovs == 0
825 
826  def staging_request(self, filename, normalize, data, password):
827  """
828  Upload a testing payload storage to a staging globaltag and create or update a jira issue
829 
830  Parameters:
831  filename (str): filename of the testing payload storage file that should be uploaded
832  normalize (Union[bool, str]): if True the payload root files will be
833  normalized to have the same checksum for the same content, if
834  normalize is a string in addition the file name in the root file
835  metadata will be set to it
836  data (dict): a dictionary with the information provided by the user:
837 
838  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
839  * tag: the globaltag name
840  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
841  if new payload classes are introduced or payload classes are modified then they will first be included in
842  the main globaltag. Here a synchronization of code and payload changes has to be managed.
843  If new or modified payload classes should be included in other globaltags they must already be in a release.
844  * pull-request: number of the pull request containing new or modified payload classes,
845  only for request == New or Modified
846  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
847  only for request == Modified
848  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
849  only for request == Modified
850  * release: the required release version
851  * reason: the reason for the request
852  * description: a detailed description for the globaltag manager
853  * issue: identifier of an existing jira issue (optional)
854  * user: name of the user
855  * time: time stamp of the request
856 
857  password: the password for access to jira or the access token and secret for oauth access
858 
859  Returns:
860  True if the upload and jira issue creation/upload was successful
861  """
862 
863  # determine the staging globaltag name
864  data['tag'] = upload_global_tag(data['task'])
865  if data['tag'] is None:
866  data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
867 
868  # create the staging globaltag if it does not exists yet
869  if not self.has_globalTaghas_globalTag(data['tag']):
870  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
871  return False
872 
873  # upload the payloads
874  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
875  entries = []
876  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
877  return False
878 
879  # get the dictionary for the jira issue creation/update
880  if data['issue']:
881  issue = data['issue']
882  else:
883  issue = jira_global_tag_v2(data['task'])
884  if issue is None:
885  issue = {"components": [{"name": "globaltag"}]}
886 
887  # create jira issue text from provided information
888  if type(issue) is tuple:
889  description = issue[1].format(**data)
890  issue = issue[0]
891  else:
892  description = f"""
893 |*Upload globaltag* | {data['tag']} |
894 |*Request reason* | {data['reason']} |
895 |*Required release* | {data['release']} |
896 |*Type of request* | {data['request']} |
897 """
898  if 'pull-request' in data.keys():
899  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
900  if 'backward-compatibility' in data.keys():
901  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
902  if 'forward-compatibility' in data.keys():
903  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
904  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
905  if data['task'] == 'online':
906  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
907 
908  # add information about uploaded payloads/IoVs
909  description += '\nPayloads\n||Name||Revision||IoV||\n'
910  for entry in entries:
911  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
912 
913  # create a new issue
914  if type(issue) is dict:
915  issue["description"] = description
916  if "summary" in issue.keys():
917  issue["summary"] = issue["summary"].format(**data)
918  else:
919  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
920  if "project" not in issue.keys():
921  issue["project"] = {"key": "BII"}
922  if "issuetype" not in issue.keys():
923  issue["issuetype"] = {"name": "Task"}
924  if data["task"] == "main":
925  issue["labels"] = ["TUPPR"]
926 
927  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
928  if isinstance(password, str):
929  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
930  json={'fields': issue})
931  else:
932  fields = {'issue': json.dumps(issue)}
933  if 'user' in data.keys():
934  fields['user'] = data['user']
935  if password:
936  fields['token'] = password[0]
937  fields['secret'] = password[1]
938  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
939  if response.status_code in range(200, 210):
940  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
941  else:
942  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
943  return False
944 
945  # comment on an existing issue
946  else:
947  # Let's make sure all assignees of new issues are added as watchers
948  # in that case, otherwise they might never find out
949  new_issue_config = jira_global_tag_v2(data['task'])
950  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
951  user = new_issue_config['assignee'].get('name', None)
952  if user is not None and isinstance(password, str):
953  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
954  auth=(data['user'], password), json=user)
955  if response.status_code in range(200, 210):
956  B2INFO(f"Added {user} as watcher to {issue}")
957  else:
958  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
959 
960  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
961  if isinstance(password, str):
962  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
963  auth=(data['user'], password), json={'body': description})
964  else:
965  fields = {'id': issue, 'user': user, 'comment': description}
966  if password:
967  fields['token'] = password[0]
968  fields['secret'] = password[1]
969  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
970  if response.status_code in range(200, 210):
971  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
972  else:
973  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
974  return False
975 
976  return True
977 
978 
979 def require_database_for_test(timeout=60, base_url=None):
980  """Make sure that the database is available and skip the test if not.
981 
982  This function should be called in test scripts if they are expected to fail
983  if the database is down. It either returns when the database is ok or it
984  will signal test_basf2 that the test should be skipped and exit
985  """
986  import sys
987  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
988  raise Exception("Access to the Database is disabled")
989  base_url_list = ConditionsDB.get_base_urls(base_url)
990  for url in base_url_list:
991  try:
992  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
993  req.raise_for_status()
994  except requests.RequestException as e:
995  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
996  else:
997  break
998  else:
999  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
1000  sys.exit(1)
1001 
1002 
1003 def enable_debugging():
1004  """Enable verbose output of python-requests to be able to debug http connections"""
1005  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
1006  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
1007  # The only thing missing will be the response.body which is not logged.
1008  import http.client as http_client
1009  import logging
1010  http_client.HTTPConnection.debuglevel = 1
1011  # You must initialize logging, otherwise you'll not see debug output.
1012  logging.basicConfig()
1013  logging.getLogger().setLevel(logging.DEBUG)
1014  requests_log = logging.getLogger("requests.packages.urllib3")
1015  requests_log.setLevel(logging.DEBUG)
1016  requests_log.propagate = True
def __call__(self, r)
Definition: __init__.py:128
_authtoken
Authorization header to send with each request.
Definition: __init__.py:126
def __init__(self, token)
Definition: __init__.py:123
def get_payloads(self, global_tag=None)
Definition: __init__.py:495
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:650
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:681
def set_authentication_token(self, token)
Definition: __init__.py:318
def get_globalTagType(self, name)
Definition: __init__.py:421
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:519
def has_globalTag(self, name)
Definition: __init__.py:399
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:615
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:328
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:271
def get_revisions(self, entries)
Definition: __init__.py:551
def create_globalTag(self, name, description, user)
Definition: __init__.py:440
def get_globalTagInfo(self, name)
Definition: __init__.py:409
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:282
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:453
def get_base_urls(given_url)
Definition: __init__.py:240
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:826
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:572
_base_url
base url to be prepended to all requests
Definition: __init__.py:301
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:173
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:175
name
name of the payload
Definition: __init__.py:165
checksum
checksum of the payload
Definition: __init__.py:167
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:160
def from_json(cls, payload, iov=None)
Definition: __init__.py:139
revision
revision, not used for comparisons
Definition: __init__.py:171
iov
interval of validity
Definition: __init__.py:169