Belle II Software  light-2309-munchkin
__init__.py
1 #!/usr/bin/env python3
2 
3 
10 
11 """
12 conditions_db
13 -------------
14 
15 Python interface to the ConditionsDB
16 """
17 
18 import os
19 import stat
20 from basf2 import B2FATAL, B2ERROR, B2INFO, B2WARNING, conditions
21 import requests
22 from requests.packages.urllib3.fields import RequestField
23 from requests.packages.urllib3.filepost import encode_multipart_formdata
24 import json
25 import urllib
26 from versioning import upload_global_tag, jira_global_tag_v2
27 from collections import defaultdict
28 from concurrent.futures import ThreadPoolExecutor, wait as futures_wait
29 import hashlib
30 import itertools
31 from typing import Union # noqa
32 import getpass
33 
34 
35 def encode_name(name):
36  """Escape name to be used in an url"""
37  return urllib.parse.quote(name, safe="")
38 
39 
40 def file_checksum(filename):
41  """Calculate md5 hash of file"""
42  md5hash = hashlib.md5()
43  with open(filename, "rb") as data:
44  md5hash.update(data.read())
45  return md5hash.hexdigest()
46 
47 
48 def chunks(container, chunk_size):
49  """Cut a container in chunks of max. chunk_size"""
50  it = iter(container)
51  while True:
52  chunk = tuple(itertools.islice(it, chunk_size))
53  if not chunk:
54  return
55  yield chunk
56 
57 
58 def get_cdb_authentication_token(path=None):
59  """
60  Helper function for correctly retrieving the CDB authentication token (either via file or via issuing server).
61 
62  :param path: Path to a file containing a CDB authentication token; if None, the function will use
63  a default path (``${HOME}/b2cdb_${BELLE2_USER}.token`` or ``/tmp/b2cdb_${BELLE2_USER}.token``) to look for a token.
64  """
65  # if we pass a path, let's use it for getting the token, otherwise use the default one
66  if path:
67  path_to_token = path
68  else:
69  path_to_token = os.path.join(os.getenv('HOME', '/tmp'), f'b2cdb_{os.getenv("BELLE2_USER", None)}.token')
70 
71  # check validity of existing token
72  if os.path.isfile(path_to_token):
73  with open(path_to_token) as token_file:
74  response = requests.get('https://token.belle2.org/check', verify=False, params={'token': token_file.read().strip()})
75  if response.status_code == 400:
76  B2INFO(f'The file {path_to_token} contains an invalid token, getting a new token...')
77  os.unlink(path_to_token)
78  elif response.status_code > 400:
79  B2WARNING("The validity of the existing token could not be checked. Trying to connect to CDB anyway.")
80 
81  # request a token if there is none
82  if not os.path.isfile(path_to_token):
83  username = os.environ['BELLE2_USER']
84  othername = input(f'\nConfirm your DESY user name by pressing enter or type the correct one [{username}]: ')
85  if othername != '':
86  username = othername
87  password = getpass.getpass("Please type your DESY password to request a CDB token: ")
88  response = requests.get('https://token.belle2.org', verify=False, auth=(username, password))
89  print(response.content)
90  if response.status_code == requests.codes.ok:
91  with open(path_to_token, 'wb') as token_file:
92  os.chmod(path_to_token, stat.S_IRUSR | stat.S_IWUSR)
93  token_file.write(response.content)
94  else:
95  B2ERROR('Failed to get token')
96  return None
97 
98  # read and return token
99  with open(path_to_token) as token_file:
100  return token_file.read().strip()
101 
102 
103 def set_cdb_authentication_token(cdb_instance, auth_token=None):
104  """
105  Helper function for setting the CDB authentication token.
106 
107  :param cdb_instance: An instance of the ``ConditionsDB`` class.
108  :param auth_token: A CDB authentication token: if ``None``, it is automatically retrieved from the issuing server
109  and then set
110  """
111  if auth_token is not None:
112  cdb_instance.set_authentication_token(auth_token)
113  else:
114  # If something goes wrong with the auth. token, the function returns None and the authentication will fail
115  token = get_cdb_authentication_token(os.getenv('BELLE2_CDB_AUTH_TOKEN', default=None))
116  cdb_instance.set_authentication_token(token)
117 
118 
119 class BearerAuth(requests.auth.AuthBase):
120  """Simple class to present bearer token instead of username/password"""
121 
122  def __init__(self, token):
123  """Construct from a token"""
124 
125  self._authtoken_authtoken = f"Bearer {token}"
126 
127  def __call__(self, r):
128  """Update headers to include token"""
129  r.headers["X-Authorization"] = self._authtoken_authtoken
130  return r
131 
132 
134  """Small container class to help compare payload information for efficient
135  comparison between globaltags"""
136 
137  @classmethod
138  def from_json(cls, payload, iov=None):
139  """Set all internal members from the json information of the payload and the iov.
140 
141  Arguments:
142  payload (dict): json information of the payload as returned by REST api
143  iov (dict): json information of the iov as returned by REST api
144  """
145  if iov is None:
146  iov = {"payloadIovId": None, "expStart": None, "runStart": None, "expEnd": None, "runEnd": None}
147 
148  return cls(
149  payload['payloadId'],
150  payload['basf2Module']['name'],
151  payload['revision'],
152  payload['checksum'],
153  payload['payloadUrl'],
154  payload['baseUrl'],
155  iov['payloadIovId'],
156  (iov["expStart"], iov["runStart"], iov["expEnd"], iov["runEnd"]),
157  )
158 
159  def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None):
160  """
161  Create a new object from the given information
162  """
163 
164  self.namename = name
165 
166  self.checksumchecksum = checksum
167 
168  self.ioviov = iov
169 
170  self.revisionrevision = revision
171 
172  self.payload_idpayload_id = payload_id
173 
174  self.iov_idiov_id = iov_id
175 
176  self.base_urlbase_url = base_url
177 
178  self.payload_urlpayload_url = payload_url
179 
180  @property
181  def url(self):
182  """Return the full url to the payload on the server"""
183  return urllib.parse.urljoin(self.base_urlbase_url + '/', self.payload_urlpayload_url)
184 
185  def __hash__(self):
186  """Make object hashable"""
187  return hash((self.namename, self.checksumchecksum, self.ioviov))
188 
189  def __eq__(self, other):
190  """Check if two payloads are equal"""
191  return (self.namename, self.checksumchecksum, self.ioviov) == (other.name, other.checksum, other.iov)
192 
193  def __lt__(self, other):
194  """Sort payloads by name, iov, revision"""
195  return (self.namename.lower(), self.ioviov, self.revisionrevision) < (other.name.lower(), other.iov, other.revision)
196 
197  def readable_iov(self):
198  """return a human readable name for the IoV"""
199  if self.ioviov is None:
200  return "none"
201 
202  if self.ioviov == (0, 0, -1, -1):
203  return "always"
204 
205  e1, r1, e2, r2 = self.ioviov
206  if e1 == e2:
207  if r1 == 0 and r2 == -1:
208  return f"exp {e1}"
209  elif r2 == -1:
210  return f"exp {e1}, runs {r1}+"
211  elif r1 == r2:
212  return f"exp {e1}, run {r1}"
213  else:
214  return f"exp {e1}, runs {r1} - {r2}"
215  else:
216  if e2 == -1 and r1 == 0:
217  return f"exp {e1} - forever"
218  elif e2 == -1:
219  return f"exp {e1}, run {r1} - forever"
220  elif r1 == 0 and r2 == -1:
221  return f"exp {e1}-{e2}, all runs"
222  elif r2 == -1:
223  return f"exp {e1}, run {r1} - exp {e2}, all runs"
224  else:
225  return f"exp {e1}, run {r1} - exp {e2}, run {r2}"
226 
227 
229  """Class to interface conditions db REST interface"""
230 
231 
232  BASE_URLS = [conditions.default_metadata_provider_url]
233 
234  class RequestError(RuntimeError):
235  """Class to be thrown by request() if there is any error"""
236  pass
237 
238  @staticmethod
239  def get_base_urls(given_url):
240  """Resolve the list of server urls. If a url is given just return it.
241  Otherwise return servers listed in BELLE2_CONDB_SERVERLIST or the
242  builtin defaults
243 
244  Arguments:
245  given_url (str): Explicit base_url. If this is not None it will be
246  returned as is in a list of length 1
247 
248  Returns:
249  a list of urls to try for database connectivity
250  """
251 
252  base_url_list = ConditionsDB.BASE_URLS[:]
253  base_url_env = os.environ.get("BELLE2_CONDB_SERVERLIST", None)
254  if given_url is not None:
255  base_url_list = [given_url]
256  elif base_url_env is not None:
257  base_url_list = base_url_env.split()
258  B2INFO("Getting Conditions Database servers from Environment:")
259  for i, url in enumerate(base_url_list, 1):
260  B2INFO(f" {i}. {url}")
261  # try to escalate to https for all given urls
262  full_list = []
263  for url in base_url_list:
264  if url.startswith("http://"):
265  full_list.append("https" + url[4:])
266  # but keep the http in case of connection problems
267  full_list.append(url)
268  return full_list
269 
270  def __init__(self, base_url=None, max_connections=10, retries=3):
271  """
272  Create a new instance of the interface
273 
274  Args:
275  base_url (string): base url of the rest interface
276  max_connections (int): number of connections to keep open, mostly useful for threaded applications
277  retries (int): number of retries in case of connection problems
278  """
279 
280 
281  self._session_session = requests.Session()
282  # and set the connection options we want to have
283  adapter = requests.adapters.HTTPAdapter(
284  pool_connections=max_connections, pool_maxsize=max_connections,
285  max_retries=retries, pool_block=True
286  )
287  self._session_session.mount("http://", adapter)
288  self._session_session.mount("https://", adapter)
289  # and also set the proxy settings
290  if "BELLE2_CONDB_PROXY" in os.environ:
291  self._session_session.proxies = {
292  "http": os.environ.get("BELLE2_CONDB_PROXY"),
293  "https": os.environ.get("BELLE2_CONDB_PROXY"),
294  }
295  # test the given url or try the known defaults
296  base_url_list = ConditionsDB.get_base_urls(base_url)
297 
298  for url in base_url_list:
299 
300  self._base_url_base_url = url.rstrip("/") + "/"
301  try:
302  req = self._session_session.request("HEAD", self._base_url_base_url + "v2/globalTags")
303  req.raise_for_status()
304  except requests.RequestException as e:
305  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
306  else:
307  break
308  else:
309  B2FATAL("No working database servers configured, giving up")
310 
311  # We have a working server so change the api to return json instead of
312  # xml, much easier in python, also request non-cached replies. We do
313  # this now because for the server check above we're fine with cached
314  # results.
315  self._session_session.headers.update({"Accept": "application/json", "Cache-Control": "no-cache"})
316 
317  def set_authentication_token(self, token):
318  """
319  Set authentication token when talking to the database
320 
321  Args:
322  token (str): JWT to hand to the database. Will not be checked
323  for validity here.
324  """
325  self._session_session.auth = BearerAuth(token)
326 
327  def request(self, method, url, message=None, *args, **argk):
328  """
329  Request function, similar to requests.request but adding the base_url
330 
331  Args:
332  method (str): GET, POST, etc.
333  url (str): url for the request, base_url will be prepended
334  message (str): message to show when starting the request and if it fails
335 
336  All other arguments will be forwarded to requests.request.
337  """
338  if message is not None:
339  B2INFO(message)
340 
341  try:
342  req = self._session_session.request(method, self._base_url_base_url + "v2/" + url.lstrip("/"), *args, **argk)
343  except requests.exceptions.ConnectionError as e:
344  B2FATAL("Could not access '" + self._base_url_base_url + url.lstrip("/") + "': " + str(e))
345 
346  if req.status_code >= 300:
347  # Apparently something is not good. Let's try to decode the json
348  # reply containing reason and message
349  try:
350  response = req.json()
351  message = response.get("message", "")
352  colon = ": " if message.strip() else ""
353  error = "Request {method} {url} returned {code} {reason}{colon}{message}".format(
354  method=method, url=url,
355  code=response["code"],
356  reason=response["reason"],
357  message=message,
358  colon=colon,
359  )
360  except json.JSONDecodeError:
361  # seems the reply was not even json
362  error = "Request {method} {url} returned non JSON response {code}: {content}".format(
363  method=method, url=url,
364  code=req.status_code,
365  content=req.content
366  )
367 
368  if message is not None:
369  raise ConditionsDB.RequestError(f"{message} failed: {error}")
370  else:
371  raise ConditionsDB.RequestError(error)
372 
373  if method != "HEAD" and req.status_code != requests.codes.no_content:
374  try:
375  req.json()
376  except json.JSONDecodeError as e:
377  B2INFO(f"Invalid response: {req.content}")
378  raise ConditionsDB.RequestError("{method} {url} returned invalid JSON response {}"
379  .format(e, method=method, url=url))
380  return req
381 
382  def get_globalTags(self):
383  """Get a list of all globaltags. Returns a dictionary with the globaltag
384  names and the corresponding ids in the database"""
385 
386  try:
387  req = self.requestrequest("GET", "/globalTags")
388  except ConditionsDB.RequestError as e:
389  B2ERROR(f"Could not get the list of globaltags: {e}")
390  return None
391 
392  result = {}
393  for tag in req.json():
394  result[tag["name"]] = tag
395 
396  return result
397 
398  def has_globalTag(self, name):
399  """Check whether the globaltag with the given name exists."""
400 
401  try:
402  self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
404  return False
405 
406  return True
407 
408  def get_globalTagInfo(self, name):
409  """Get the id of the globaltag with the given name. Returns either the
410  id or None if the tag was not found"""
411 
412  try:
413  req = self.requestrequest("GET", "/globalTag/{globalTagName}".format(globalTagName=encode_name(name)))
414  except ConditionsDB.RequestError as e:
415  B2ERROR(f"Cannot find globaltag '{name}': {e}")
416  return None
417 
418  return req.json()
419 
420  def get_globalTagType(self, name):
421  """
422  Get the dictionary describing the given globaltag type (currently
423  one of DEV or RELEASE). Returns None if tag type was not found.
424  """
425  try:
426  req = self.requestrequest("GET", "/globalTagType")
427  except ConditionsDB.RequestError as e:
428  B2ERROR(f"Could not get list of valid globaltag types: {e}")
429  return None
430 
431  types = {e["name"]: e for e in req.json()}
432 
433  if name in types:
434  return types[name]
435 
436  B2ERROR("Unknown globaltag type: '{}', please use one of {}".format(name, ", ".join(types)))
437  return None
438 
439  def create_globalTag(self, name, description, user):
440  """
441  Create a new globaltag
442  """
443  info = {"name": name, "description": description, "modifiedBy": user, "isDefault": False}
444  try:
445  req = self.requestrequest("POST", "/globalTag/DEV", f"Creating globaltag {name}", json=info)
446  except ConditionsDB.RequestError as e:
447  B2ERROR(f"Could not create globaltag {name}: {e}")
448  return None
449 
450  return req.json()
451 
452  def get_all_iovs(self, globalTag, exp=None, run=None, message=None):
453  """
454  Return list of all payloads in the given globaltag where each element is
455  a `PayloadInformation` instance
456 
457  Parameters:
458  gobalTag (str): name of the globaltag
459  exp (int): if given limit the list of payloads to the ones valid for
460  the given exp,run combination
461  run (int): if given limit the list of payloads to the ones valid for
462  the given exp,run combination
463  message (str): additional message to show when downloading the
464  payload information. Will be directly appended to
465  "Obtaining lists of iovs for globaltag {globalTag}"
466 
467  Warning:
468  Both, exp and run, need to be given at the same time. Just supplying
469  an experiment or a run number will not work
470  """
471  globalTag = encode_name(globalTag)
472  if message is None:
473  message = ""
474  if exp is not None:
475  msg = f"Obtaining list of iovs for globaltag {globalTag}, exp={exp}, run={run}{message}"
476  req = self.requestrequest("GET", "/iovPayloads", msg, params={'gtName': globalTag, 'expNumber': exp, 'runNumber': run})
477  else:
478  msg = f"Obtaining list of iovs for globaltag {globalTag}{message}"
479  req = self.requestrequest("GET", f"/globalTag/{globalTag}/globalTagPayloads", msg)
480  all_iovs = []
481  for item in req.json():
482  payload = item["payload" if 'payload' in item else "payloadId"]
483  if "payloadIov" in item:
484  iovs = [item['payloadIov']]
485  else:
486  iovs = item['payloadIovs']
487 
488  for iov in iovs:
489  all_iovs.append(PayloadInformation.from_json(payload, iov))
490 
491  all_iovs.sort()
492  return all_iovs
493 
494  def get_payloads(self, global_tag=None):
495  """
496  Get a list of all defined payloads (for the given global_tag or by default for all).
497  Returns a dictionary which maps (module, checksum) to the payload id.
498  """
499 
500  try:
501  if global_tag:
502  req = self.requestrequest("GET", "/globalTag/{global_tag}/payloads"
503  .format(global_tag=encode_name(global_tag)))
504  else:
505  req = self.requestrequest("GET", "/payloads")
506  except ConditionsDB.RequestError as e:
507  B2ERROR(f"Cannot get list of payloads: {e}")
508  return {}
509 
510  result = {}
511  for payload in req.json():
512  module = payload["basf2Module"]["name"]
513  checksum = payload["checksum"]
514  result[(module, checksum)] = payload["payloadId"]
515 
516  return result
517 
518  def check_payloads(self, payloads, information="payloadId"):
519  """
520  Check for the existence of payloads in the database.
521 
522  Arguments:
523  payloads (list((str,str))): A list of payloads to check for. Each
524  payload needs to be a tuple of the name of the payload and the
525  md5 checksum of the payload file.
526  information (str): The information to be extracted from the
527  payload dictionary
528 
529  Returns:
530  A dictionary with the payload identifiers (name, checksum) as keys
531  and the requested information as values for all payloads which are already
532  present in the database.
533  """
534 
535  search_query = [{"name": e[0], "checksum": e[1]} for e in payloads]
536  try:
537  req = self.requestrequest("POST", "/checkPayloads", json=search_query)
538  except ConditionsDB.RequestError as e:
539  B2ERROR(f"Cannot check for existing payloads: {e}")
540  return {}
541 
542  result = {}
543  for payload in req.json():
544  module = payload["basf2Module"]["name"]
545  checksum = payload["checksum"]
546  result[(module, checksum)] = payload[information]
547 
548  return result
549 
550  def get_revisions(self, entries):
551  """
552  Get the revision numbers of payloads in the database.
553 
554  Arguments:
555  entries (list): A list of payload entries.
556  Each entry must have the attributes module and checksum.
557 
558  Returns:
559  True if successful.
560  """
561 
562  result = self.check_payloadscheck_payloads([(entry.module, entry.checksum) for entry in entries], "revision")
563  if not result:
564  return False
565 
566  for entry in entries:
567  entry.revision = result.get((entry.module, entry.checksum), 0)
568 
569  return True
570 
571  def create_payload(self, module, filename, checksum=None):
572  """
573  Create a new payload
574 
575  Args:
576  module (str): name of the module
577  filename (str): name of the file
578  checksum (str): md5 hexdigest of the file. Will be calculated automatically if not given
579  """
580  if checksum is None:
581  checksum = file_checksum(filename)
582 
583  # this is the only complicated request as we have to provide a
584  # multipart/mixed request which is not directly provided by the request
585  # library.
586  files = [
587  (filename, open(filename, "rb").read(), "application/x-root"),
588  ("json", json.dumps({"checksum": checksum, "isDefault": False}), "application/json"),
589  ]
590  # ok we have the two "files" we want to send, create multipart/mixed
591  # body
592  fields = []
593  for name, contents, mimetype in files:
594  rf = RequestField(name=name, data=contents)
595  rf.make_multipart(content_type=mimetype)
596  fields.append(rf)
597 
598  post_body, content_type = encode_multipart_formdata(fields)
599  content_type = ''.join(('multipart/mixed',) + content_type.partition(';')[1:])
600  headers = {'Content-Type': content_type}
601 
602  # now make the request. Note to self: if multipart/form-data would be
603  # accepted this would be so much nicer here. but it works.
604  try:
605  req = self.requestrequest("POST", "/package/dbstore/module/{moduleName}/payload"
606  .format(moduleName=encode_name(module)),
607  data=post_body, headers=headers)
608  except ConditionsDB.RequestError as e:
609  B2ERROR(f"Could not create Payload: {e}")
610  return None
611 
612  return req.json()["payloadId"]
613 
614  def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun):
615  """
616  Create an iov.
617 
618  Args:
619  globalTagId (int): id of the globaltag, obtain with get_globalTagId()
620  payloadId (int): id of the payload, obtain from create_payload() or get_payloads()
621  firstExp (int): first experiment for which this iov is valid
622  firstRun (int): first run for which this iov is valid
623  finalExp (int): final experiment for which this iov is valid
624  finalRun (int): final run for which this iov is valid
625 
626  Returns:
627  payloadIovId of the created iov, None if creation was not successful
628  """
629  try:
630  # try to convert all arguments except self to integers to make sure they are
631  # valid.
632  local_variables = locals()
633  variables = {e: int(local_variables[e]) for e in
634  ["globalTagId", "payloadId", "firstExp", "firstRun", "finalExp", "finalRun"]}
635  except ValueError:
636  B2ERROR("create_iov: All parameters need to be integers")
637  return None
638 
639  # try to create the iov
640  try:
641  req = self.requestrequest("POST", "/globalTagPayload/{globalTagId},{payloadId}"
642  "/payloadIov/{firstExp},{firstRun},{finalExp},{finalRun}".format(**variables))
643  except ConditionsDB.RequestError as e:
644  B2ERROR(f"Could not create IOV: {e}")
645  return None
646 
647  return req.json()["payloadIovId"]
648 
649  def get_iovs(self, globalTagName, payloadName=None):
650  """Return existing iovs for a given tag name. It returns a dictionary
651  which maps (payloadId, first runId, final runId) to iovId
652 
653  Parameters:
654  globalTagName(str): Global tag name.
655  payloadName(str): Payload name (if None, selection by name is
656  not performed.
657  """
658 
659  try:
660  req = self.requestrequest("GET", "/globalTag/{globalTagName}/globalTagPayloads"
661  .format(globalTagName=encode_name(globalTagName)))
663  # there could be just no iovs so no error
664  return {}
665 
666  result = {}
667  for payload in req.json():
668  payloadId = payload["payloadId"]["payloadId"]
669  if payloadName is not None:
670  if payload["payloadId"]["basf2Module"]["name"] != payloadName:
671  continue
672  for iov in payload["payloadIovs"]:
673  iovId = iov["payloadIovId"]
674  firstExp, firstRun = iov["expStart"], iov["runStart"]
675  finalExp, finalRun = iov["expEnd"], iov["runEnd"]
676  result[(payloadId, firstExp, firstRun, finalExp, finalRun)] = iovId
677 
678  return result
679 
680  def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None):
681  """
682  Upload a testing payload storage to the conditions database.
683 
684  Parameters:
685  filename (str): filename of the testing payload storage file that should be uploaded
686  global_tage (str): name of the globaltag to which the data should be uploaded
687  normalize (Union[bool, str]): if True the payload root files will be normalized to have the same checksum for the
688  same content, if normalize is a string in addition the file name in the root file metadata will be set to it
689  ignore_existing (bool): if True do not upload payloads that already exist
690  nprocess (int): maximal number of parallel uploads
691  uploaded_entries (list): the list of successfully uploaded entries
692 
693  Returns:
694  True if the upload was successful
695  """
696 
697  # first create a list of payloads
698  from conditions_db.testing_payloads import parse_testing_payloads_file
699  B2INFO(f"Reading payload list from {filename}")
700  entries = parse_testing_payloads_file(filename)
701  if entries is None:
702  B2ERROR(f"Problems with testing payload storage file {filename}, exiting")
703  return False
704 
705  if not entries:
706  B2INFO(f"No payloads found in {filename}, exiting")
707  return True
708 
709  B2INFO(f"Found {len(entries)} iovs to upload")
710 
711  # time to get the id for the globaltag
712  tagId = self.get_globalTagInfoget_globalTagInfo(global_tag)
713  if tagId is None:
714  return False
715  tagId = tagId["globalTagId"]
716 
717  # now we could have more than one payload with the same iov so let's go over
718  # it again and remove duplicates but keep the last one for each
719  entries = sorted(set(reversed(entries)))
720 
721  if normalize:
722  name = normalize if normalize is not True else None
723  for e in entries:
724  e.normalize(name=name)
725 
726  # so let's have a list of all payloads (name, checksum) as some payloads
727  # might have multiple iovs. Each payload gets a list of all of those
728  payloads = defaultdict(list)
729  for e in entries:
730  payloads[(e.module, e.checksum)].append(e)
731 
732  existing_payloads = {}
733  existing_iovs = {}
734 
735  def upload_payload(item):
736  """Upload a payload file if necessary but first check list of existing payloads"""
737  key, entries = item
738  if key in existing_payloads:
739  B2INFO(f"{key[0]} (md5:{key[1]}) already existing in database, skipping.")
740  payload_id = existing_payloads[key]
741  else:
742  entry = entries[0]
743  payload_id = self.create_payloadcreate_payload(entry.module, entry.filename, entry.checksum)
744  if payload_id is None:
745  return False
746 
747  B2INFO(f"Created new payload {payload_id} for {entry.module} (md5:{entry.checksum})")
748 
749  for entry in entries:
750  entry.payload = payload_id
751 
752  return True
753 
754  def create_iov(entry):
755  """Create an iov if necessary but first check the list of existing iovs"""
756  if entry.payload is None:
757  return None
758 
759  iov_key = (entry.payload,) + entry.iov_tuple
760  if iov_key in existing_iovs:
761  entry.iov = existing_iovs[iov_key]
762  B2INFO(f"IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum}) already existing in database, skipping.")
763  else:
764  entry.payloadIovId = self.create_iovcreate_iov(tagId, entry.payload, *entry.iov_tuple)
765  if entry.payloadIovId is None:
766  return False
767 
768  B2INFO(f"Created IoV {entry.iov_tuple} for {entry.module} (md5:{entry.checksum})")
769 
770  return entry
771 
772  # multithreading for the win ...
773  with ThreadPoolExecutor(max_workers=nprocess) as pool:
774  # if we want to check for existing payloads/iovs we schedule the download of
775  # the full payload list. And write a message as each completes
776  if not ignore_existing:
777  B2INFO("Downloading information about existing payloads and iovs...")
778  futures = []
779  existing_iovs = {}
780  existing_payloads = {}
781 
782  def create_future(iter, func, callback=None):
783  fn = pool.submit(iter, func)
784  if callback is not None:
785  fn.add_done_callback(callback)
786  futures.append(fn)
787 
788  def update_iovs(iovs):
789  existing_iovs.update(iovs.result())
790  B2INFO(f"Found {len(existing_iovs)} existing iovs in {global_tag}")
791 
792  def update_payloads(payloads):
793  existing_payloads.update(payloads.result())
794  B2INFO(f"Found {len(existing_payloads)} existing payloads")
795 
796  create_future(self.get_iovsget_iovs, global_tag, update_iovs)
797  # checking existing payloads should not be done with too many at once
798  for chunk in chunks(payloads.keys(), 1000):
799  create_future(self.check_payloadscheck_payloads, chunk, update_payloads)
800 
801  futures_wait(futures)
802 
803  # upload payloads
804  failed_payloads = sum(0 if result else 1 for result in pool.map(upload_payload, payloads.items()))
805  if failed_payloads > 0:
806  B2ERROR(f"{failed_payloads} payloads could not be uploaded")
807 
808  # create IoVs
809  failed_iovs = 0
810  for entry in pool.map(create_iov, entries):
811  if entry:
812  if uploaded_entries is not None:
813  uploaded_entries.append(entry)
814  else:
815  failed_iovs += 1
816  if failed_iovs > 0:
817  B2ERROR(f"{failed_iovs} IoVs could not be created")
818 
819  # update revision numbers
820  if uploaded_entries is not None:
821  self.get_revisionsget_revisions(uploaded_entries)
822 
823  return failed_payloads + failed_iovs == 0
824 
825  def staging_request(self, filename, normalize, data, password):
826  """
827  Upload a testing payload storage to a staging globaltag and create or update a jira issue
828 
829  Parameters:
830  filename (str): filename of the testing payload storage file that should be uploaded
831  normalize (Union[bool, str]): if True the payload root files will be
832  normalized to have the same checksum for the same content, if
833  normalize is a string in addition the file name in the root file
834  metadata will be set to it
835  data (dict): a dictionary with the information provided by the user:
836 
837  * task: category of globaltag, either main, online, prompt, data, mc, or analysis
838  * tag: the globaltag name
839  * request: type of request, either Update, New, or Modification. The latter two imply task == main because
840  if new payload classes are introduced or payload classes are modified then they will first be included in
841  the main globaltag. Here a synchronization of code and payload changes has to be managed.
842  If new or modified payload classes should be included in other globaltags they must already be in a release.
843  * pull-request: number of the pull request containing new or modified payload classes,
844  only for request == New or Modified
845  * backward-compatibility: description of what happens if the old payload is encountered by the updated code,
846  only for request == Modified
847  * forward-compatibility: description of what happens if a new payload is encountered by the existing code,
848  only for request == Modified
849  * release: the required release version
850  * reason: the reason for the request
851  * description: a detailed description for the globaltag manager
852  * issue: identifier of an existing jira issue (optional)
853  * user: name of the user
854  * time: time stamp of the request
855 
856  password: the password for access to jira or the access token and secret for oauth access
857 
858  Returns:
859  True if the upload and jira issue creation/upload was successful
860  """
861 
862  # determine the staging globaltag name
863  data['tag'] = upload_global_tag(data['task'])
864  if data['tag'] is None:
865  data['tag'] = f"temp_{data['task']}_{data['user']}_{data['time']}"
866 
867  # create the staging globaltag if it does not exists yet
868  if not self.has_globalTaghas_globalTag(data['tag']):
869  if not self.create_globalTagcreate_globalTag(data['tag'], data['reason'], data['user']):
870  return False
871 
872  # upload the payloads
873  B2INFO(f"Uploading testing database {filename} to globaltag {data['tag']}")
874  entries = []
875  if not self.uploadupload(filename, data['tag'], normalize, uploaded_entries=entries):
876  return False
877 
878  # get the dictionary for the jira issue creation/update
879  if data['issue']:
880  issue = data['issue']
881  else:
882  issue = jira_global_tag_v2(data['task'])
883  if issue is None:
884  issue = {"components": [{"name": "globaltag"}]}
885 
886  # create jira issue text from provided information
887  if type(issue) is tuple:
888  description = issue[1].format(**data)
889  issue = issue[0]
890  else:
891  description = f"""
892 |*Upload globaltag* | {data['tag']} |
893 |*Request reason* | {data['reason']} |
894 |*Required release* | {data['release']} |
895 |*Type of request* | {data['request']} |
896 """
897  if 'pull-request' in data.keys():
898  description += f"|*Pull request* | \\#{data['pull-request']} |\n"
899  if 'backward-compatibility' in data.keys():
900  description += f"|*Backward compatibility* | \\#{data['backward-compatibility']} |\n"
901  if 'forward-compatibility' in data.keys():
902  description += f"|*Forward compatibility* | \\#{data['forward-compatibility']} |\n"
903  description += '|*Details* |' + ''.join(data['details']) + ' |\n'
904  if data['task'] == 'online':
905  description += '|*Impact on data taking*|' + ''.join(data['data_taking']) + ' |\n'
906 
907  # add information about uploaded payloads/IoVs
908  description += '\nPayloads\n||Name||Revision||IoV||\n'
909  for entry in entries:
910  description += f"|{entry.module} | {entry.revision} | ({entry.iov_str()}) |\n"
911 
912  # create a new issue
913  if type(issue) is dict:
914  issue["description"] = description
915  if "summary" in issue.keys():
916  issue["summary"] = issue["summary"].format(**data)
917  else:
918  issue["summary"] = f"Globaltag request for {data['task']} by {data['user']} at {data['time']}"
919  if "project" not in issue.keys():
920  issue["project"] = {"key": "BII"}
921  if "issuetype" not in issue.keys():
922  issue["issuetype"] = {"name": "Task"}
923  if data["task"] == "main":
924  issue["labels"] = ["TUPPR"]
925 
926  B2INFO(f"Creating jira issue for {data['task']} globaltag request")
927  if isinstance(password, str):
928  response = requests.post('https://agira.desy.de/rest/api/latest/issue', auth=(data['user'], password),
929  json={'fields': issue})
930  else:
931  fields = {'issue': json.dumps(issue)}
932  if 'user' in data.keys():
933  fields['user'] = data['user']
934  if password:
935  fields['token'] = password[0]
936  fields['secret'] = password[1]
937  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
938  if response.status_code in range(200, 210):
939  B2INFO(f"Issue successfully created: https://agira.desy.de/browse/{response.json()['key']}")
940  else:
941  B2ERROR('The creation of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
942  return False
943 
944  # comment on an existing issue
945  else:
946  # Let's make sure all assignees of new issues are added as watchers
947  # in that case, otherwise they might never find out
948  new_issue_config = jira_global_tag_v2(data['task'])
949  if isinstance(new_issue_config, dict) and "assignee" in new_issue_config:
950  user = new_issue_config['assignee'].get('name', None)
951  if user is not None and isinstance(password, str):
952  response = requests.post(f'https://agira.desy.de/rest/api/latest/issue/{issue}/watchers',
953  auth=(data['user'], password), json=user)
954  if response.status_code in range(200, 210):
955  B2INFO(f"Added {user} as watcher to {issue}")
956  else:
957  B2WARNING(f"Could not add {user} as watcher to {issue}: {response.status_code}")
958 
959  B2INFO(f"Commenting on jira issue {issue} for {data['task']} globaltag request")
960  if isinstance(password, str):
961  response = requests.post('https://agira.desy.de/rest/api/latest/issue/%s/comment' % issue,
962  auth=(data['user'], password), json={'body': description})
963  else:
964  fields = {'id': issue, 'user': user, 'comment': description}
965  if password:
966  fields['token'] = password[0]
967  fields['secret'] = password[1]
968  response = requests.post('https://b2-master.belle2.org/cgi-bin/jira_issue.py', data=fields)
969  if response.status_code in range(200, 210):
970  B2INFO(f"Issue successfully updated: https://agira.desy.de/browse/{issue}")
971  else:
972  B2ERROR('The commenting of the issue failed: ' + requests.status_codes._codes[response.status_code][0])
973  return False
974 
975  return True
976 
977 
978 def require_database_for_test(timeout=60, base_url=None):
979  """Make sure that the database is available and skip the test if not.
980 
981  This function should be called in test scripts if they are expected to fail
982  if the database is down. It either returns when the database is ok or it
983  will signal test_basf2 that the test should be skipped and exit
984  """
985  import sys
986  if os.environ.get("BELLE2_CONDB_GLOBALTAG", None) == "":
987  raise Exception("Access to the Database is disabled")
988  base_url_list = ConditionsDB.get_base_urls(base_url)
989  for url in base_url_list:
990  try:
991  req = requests.request("HEAD", url.rstrip('/') + "/v2/globalTags")
992  req.raise_for_status()
993  except requests.RequestException as e:
994  B2WARNING(f"Problem connecting to {url}:\n {e}\n Trying next server ...")
995  else:
996  break
997  else:
998  print("TEST SKIPPED: No working database servers configured, giving up", file=sys.stderr)
999  sys.exit(1)
1000 
1001 
1002 def enable_debugging():
1003  """Enable verbose output of python-requests to be able to debug http connections"""
1004  # These two lines enable debugging at httplib level (requests->urllib3->http.client)
1005  # You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
1006  # The only thing missing will be the response.body which is not logged.
1007  import http.client as http_client
1008  import logging
1009  http_client.HTTPConnection.debuglevel = 1
1010  # You must initialize logging, otherwise you'll not see debug output.
1011  logging.basicConfig()
1012  logging.getLogger().setLevel(logging.DEBUG)
1013  requests_log = logging.getLogger("requests.packages.urllib3")
1014  requests_log.setLevel(logging.DEBUG)
1015  requests_log.propagate = True
def __call__(self, r)
Definition: __init__.py:127
_authtoken
Authorization header to send with each request.
Definition: __init__.py:125
def __init__(self, token)
Definition: __init__.py:122
def get_payloads(self, global_tag=None)
Definition: __init__.py:494
def get_iovs(self, globalTagName, payloadName=None)
Definition: __init__.py:649
def upload(self, filename, global_tag, normalize=False, ignore_existing=False, nprocess=1, uploaded_entries=None)
Definition: __init__.py:680
def set_authentication_token(self, token)
Definition: __init__.py:317
def get_globalTagType(self, name)
Definition: __init__.py:420
def check_payloads(self, payloads, information="payloadId")
Definition: __init__.py:518
def has_globalTag(self, name)
Definition: __init__.py:398
def create_iov(self, globalTagId, payloadId, firstExp, firstRun, finalExp, finalRun)
Definition: __init__.py:614
def request(self, method, url, message=None, *args, **argk)
Definition: __init__.py:327
def __init__(self, base_url=None, max_connections=10, retries=3)
Definition: __init__.py:270
def get_revisions(self, entries)
Definition: __init__.py:550
def create_globalTag(self, name, description, user)
Definition: __init__.py:439
def get_globalTagInfo(self, name)
Definition: __init__.py:408
_session
session object to get keep-alive support and connection pooling
Definition: __init__.py:281
def get_all_iovs(self, globalTag, exp=None, run=None, message=None)
Definition: __init__.py:452
def get_base_urls(given_url)
Definition: __init__.py:239
def staging_request(self, filename, normalize, data, password)
Definition: __init__.py:825
def create_payload(self, module, filename, checksum=None)
Definition: __init__.py:571
_base_url
base url to be prepended to all requests
Definition: __init__.py:300
payload_id
payload id in CDB, not used for comparisons
Definition: __init__.py:172
iov_id
iov id in CDB, not used for comparisons
Definition: __init__.py:174
name
name of the payload
Definition: __init__.py:164
checksum
checksum of the payload
Definition: __init__.py:166
def __init__(self, payload_id, name, revision, checksum, payload_url, base_url, iov_id=None, iov=None)
Definition: __init__.py:159
def from_json(cls, payload, iov=None)
Definition: __init__.py:138
revision
revision, not used for comparisons
Definition: __init__.py:170
iov
interval of validity
Definition: __init__.py:168